Update get_data to us get_some_data sub functions. (#27)

This commit is contained in:
UpstreamData
2023-01-26 22:18:03 -07:00
committed by GitHub
parent 67b3e2f312
commit 2d4c063dfa
147 changed files with 3639 additions and 2658 deletions

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union
from typing import Union, Optional
import httpx
@@ -21,90 +21,90 @@ from pyasic.settings import PyasicSettings
class BMMinerX17(BMMiner):
def __init__(self, ip: str, api_ver: str = "1.0.0") -> None:
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.uname = "root"
self.pwd = PyasicSettings().global_x17_password
async def get_hostname(self) -> Union[str, None]:
hostname = None
url = f"http://{self.ip}/cgi-bin/get_system_info.cgi"
async def send_web_command(
self, command: str, params: dict = None
) -> Optional[dict]:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.uname, self.pwd)
async with httpx.AsyncClient() as client:
data = await client.get(url, auth=auth)
if data.status_code == 200:
data = data.json()
if len(data.keys()) > 0:
if "hostname" in data.keys():
hostname = data["hostname"]
return hostname
try:
async with httpx.AsyncClient() as client:
if params:
data = await client.post(url, data=params, auth=auth)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
async def get_mac(self) -> Union[str, None]:
mac = None
url = f"http://{self.ip}/cgi-bin/get_system_info.cgi"
auth = httpx.DigestAuth(self.uname, self.pwd)
async with httpx.AsyncClient() as client:
data = await client.get(url, auth=auth)
if data.status_code == 200:
data = data.json()
if len(data.keys()) > 0:
if "macaddr" in data.keys():
mac = data["macaddr"]
return mac
try:
data = await self.send_web_command("get_system_info")
if data:
return data["macaddr"]
except KeyError:
pass
async def fault_light_on(self) -> bool:
url = f"http://{self.ip}/cgi-bin/blink.cgi"
auth = httpx.DigestAuth(self.uname, self.pwd)
async with httpx.AsyncClient() as client:
try:
await client.post(url, data={"action": "startBlink"}, auth=auth)
except httpx.ReadTimeout:
# Expected behaviour
pass
data = await client.post(url, data={"action": "onPageLoaded"}, auth=auth)
if data.status_code == 200:
data = data.json()
if data["isBlinking"]:
self.light = True
return True
return False
# this should time out, after it does do a check
await self.send_web_command("blink", params={"action": "startBlink"})
try:
data = await self.send_web_command(
"blink", params={"action": "onPageLoaded"}
)
if data:
if data["isBlinking"]:
self.light = True
except KeyError:
pass
return self.light
async def fault_light_off(self) -> bool:
url = f"http://{self.ip}/cgi-bin/blink.cgi"
auth = httpx.DigestAuth(self.uname, self.pwd)
async with httpx.AsyncClient() as client:
await client.post(url, data={"action": "stopBlink"}, auth=auth)
data = await client.post(url, data={"action": "onPageLoaded"}, auth=auth)
if data.status_code == 200:
data = data.json()
if not data["isBlinking"]:
self.light = False
return True
return False
async def check_light(self) -> Union[bool, None]:
if self.light:
return self.light
url = f"http://{self.ip}/cgi-bin/blink.cgi"
auth = httpx.DigestAuth(self.uname, self.pwd)
async with httpx.AsyncClient() as client:
data = await client.post(url, data={"action": "onPageLoaded"}, auth=auth)
if data.status_code == 200:
data = data.json()
if data["isBlinking"]:
self.light = True
return True
else:
self.light = False
return False
return None
await self.send_web_command("blink", params={"action": "stopBlink"})
try:
data = await self.send_web_command(
"blink", params={"action": "onPageLoaded"}
)
if data:
if not data["isBlinking"]:
self.light = False
except KeyError:
pass
return self.light
async def reboot(self) -> bool:
url = f"http://{self.ip}/cgi-bin/reboot.cgi"
auth = httpx.DigestAuth(self.uname, self.pwd)
async with httpx.AsyncClient() as client:
data = await client.get(url, auth=auth)
if data.status_code == 200:
data = await self.send_web_command("reboot")
if data:
return True
return False
async def get_fault_light(self) -> bool:
if self.light:
return self.light
try:
data = await self.send_web_command(
"blink", params={"action": "onPageLoaded"}
)
if data:
self.light = data["isBlinking"]
except KeyError:
pass
return self.light
async def get_hostname(self) -> Union[str, None]:
try:
data = await self.send_web_command("get_system_info")
if data:
return data["hostname"]
except KeyError:
pass