Merge branch 'dev'

This commit is contained in:
Upstream Data
2023-03-01 19:56:27 -07:00
34 changed files with 720 additions and 424 deletions

View File

@@ -92,7 +92,9 @@ class BaseMinerAPI:
async def send_privileged_command(self, *args, **kwargs) -> dict:
return await self.send_command(*args, **kwargs)
async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
async def multicommand(
self, *commands: str, ignore_errors: bool = False, allow_warning: bool = True
) -> dict:
"""Creates and sends multiple commands as one command to the miner.
Parameters:
@@ -107,7 +109,9 @@ class BaseMinerAPI:
# standard format doesn't work for X19
command = "+".join(commands)
try:
data = await self.send_command(command, allow_warning=allow_warning)
data = await self.send_command(
command, allow_warning=allow_warning, ignore_errors=ignore_errors
)
except APIError:
return {command: [{}] for command in commands}
logging.debug(f"{self} - (Multicommand) - Received data")

View File

@@ -246,7 +246,7 @@ class BTMinerAPI(BaseMinerAPI):
logging.debug(f"{self} - (Send Privileged Command) - Sending")
try:
data = await self._send_bytes(enc_command, timeout)
except (asyncio.CancelledError, asyncio.TimeoutError) as e:
except (asyncio.CancelledError, asyncio.TimeoutError):
if ignore_errors:
return {}
raise APIError("No data was returned from the API.")

View File

@@ -62,7 +62,7 @@ class CGMinerAPI(BaseMinerAPI):
for cmd in commands:
data[cmd] = []
data[cmd].append(await self.send_command(cmd, allow_warning=True))
except APIError as e:
except APIError:
pass
except Exception as e:
logging.warning(

View File

@@ -14,14 +14,13 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
import logging
import random
import string
import time
from dataclasses import asdict, dataclass, fields
from enum import IntEnum
from typing import Dict, List, Literal
from typing import List, Literal
import toml
import yaml
@@ -202,17 +201,23 @@ class _PoolGroup:
pools[f"{key}{idx+1}"] = pool_data[key]
return pools
def as_wm(self, user_suffix: str = None) -> List[dict]:
def as_wm(self, user_suffix: str = None) -> dict:
"""Convert the data in this class to a list usable by a Whatsminer device.
Parameters:
user_suffix: The suffix to append to username.
"""
pools = []
for pool in self.pools[:3]:
pools.append(pool.as_wm(user_suffix=user_suffix))
while len(pools) < 3:
pools.append({"url": None, "user": None, "pass": None})
pools = {}
for i in range(1, 4):
if i <= len(self.pools):
pool_wm = self.pools[i - 1].as_wm(user_suffix)
pools[f"pool_{i}"] = pool_wm["url"]
pools[f"worker_{i}"] = pool_wm["user"]
pools[f"passwd_{i}"] = pool_wm["pass"]
else:
pools[f"pool_{i}"] = ""
pools[f"worker_{i}"] = ""
pools[f"passwd_{i}"] = ""
return pools
def as_avalon(self, user_suffix: str = None) -> str:
@@ -434,7 +439,7 @@ class MinerConfig:
logging.debug(f"MinerConfig - (From YAML) - Loading YAML config")
return self.from_dict(yaml.load(data, Loader=yaml.SafeLoader))
def as_wm(self, user_suffix: str = None) -> Dict[str, int]:
def as_wm(self, user_suffix: str = None) -> dict:
"""Convert the data in this class to a config usable by a Whatsminer device.
Parameters:
@@ -455,7 +460,7 @@ class MinerConfig:
logging.debug(f"MinerConfig - (As Inno) - Generating Innosilicon config")
return self.pool_groups[0].as_inno(user_suffix=user_suffix)
def as_x19(self, user_suffix: str = None) -> str:
def as_x19(self, user_suffix: str = None) -> dict:
"""Convert the data in this class to a config usable by an X19 device.
Parameters:
@@ -475,7 +480,7 @@ class MinerConfig:
if self.fan_speed:
cfg["bitmain-fan-ctrl"] = str(self.fan_speed)
return json.dumps(cfg)
return cfg
def as_avalon(self, user_suffix: str = None) -> str:
"""Convert the data in this class to a config usable by an Avalonminer device.

View File

@@ -15,17 +15,25 @@
# ------------------------------------------------------------------------------
import asyncio
import logging
from typing import List, Union
# from pyasic.errors import PhaseBalancingError
from pyasic.errors import APIError
from pyasic.miners import AnyMiner
from pyasic.miners._backends import X19, BOSMiner, BTMiner
from pyasic.miners._types import S9, S17, T17, S17e, S17Plus, S17Pro, T17e, T17Plus
# from pprint import pprint as print
from pyasic.miners._backends import ( # noqa - Ignore access to _module
X19,
BOSMiner,
BTMiner,
)
from pyasic.miners._types import ( # noqa - Ignore access to _module
S9,
S17,
T17,
S17e,
S17Plus,
S17Pro,
T17e,
T17Plus,
)
FAN_USAGE = 50 # 50 W per fan

View File

@@ -15,47 +15,23 @@
# ------------------------------------------------------------------------------
import asyncio
import json
from typing import List, Optional, Union
import httpx
from pyasic.API import APIError
from pyasic.config import MinerConfig, X19PowerMode
from pyasic.data.error_codes import MinerErrorData, X19Error
from pyasic.miners._backends import BMMiner # noqa - Ignore access to _module
from pyasic.settings import PyasicSettings
from pyasic.web.X19 import X19WebAPI
class X19(BMMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.uname = "root"
self.pwd = PyasicSettings().global_x19_password
async def send_web_command(
self, command: str, params: dict = None
) -> Optional[dict]:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.uname, self.pwd)
try:
async with httpx.AsyncClient() as client:
if params:
data = await client.post(url, data=params, auth=auth)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
self.web = X19WebAPI(ip)
async def get_config(self) -> MinerConfig:
data = await self.send_web_command("get_miner_conf")
data = await self.web.get_miner_conf()
if data:
self.config = MinerConfig().from_raw(data)
return self.config
@@ -63,9 +39,7 @@ class X19(BMMiner):
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
conf = config.as_x19(user_suffix=user_suffix)
await self.send_web_command(
"set_miner_conf", params=conf # noqa: ignore conf being a str
)
await self.web.set_miner_conf(conf)
for i in range(7):
data = await self.get_config()
@@ -74,27 +48,21 @@ class X19(BMMiner):
await asyncio.sleep(1)
async def fault_light_on(self) -> bool:
data = await self.send_web_command(
"blink",
params=json.dumps({"blink": "true"}), # noqa - ignore params being a str
)
data = await self.web.blink(blink=True)
if data:
if data.get("code") == "B000":
self.light = True
return self.light
async def fault_light_off(self) -> bool:
data = await self.send_web_command(
"blink",
params=json.dumps({"blink": "false"}), # noqa - ignore params being a str
)
data = await self.web.blink(blink=False)
if data:
if data.get("code") == "B100":
self.light = True
return self.light
async def reboot(self) -> bool:
data = await self.send_web_command("reboot")
data = await self.web.reboot()
if data:
return True
return False
@@ -113,7 +81,7 @@ class X19(BMMiner):
async def get_hostname(self) -> Union[str, None]:
try:
data = await self.send_web_command("get_system_info")
data = await self.web.get_system_info()
if data:
return data["hostname"]
except KeyError:
@@ -121,14 +89,14 @@ class X19(BMMiner):
async def get_mac(self) -> Union[str, None]:
try:
data = await self.send_web_command("get_system_info")
data = await self.web.get_system_info()
if data:
return data["macaddr"]
except KeyError:
pass
try:
data = await self.send_web_command("get_network_info")
data = await self.web.get_network_info()
if data:
return data["macaddr"]
except KeyError:
@@ -136,7 +104,7 @@ class X19(BMMiner):
async def get_errors(self) -> List[MinerErrorData]:
errors = []
data = await self.send_web_command("summary")
data = await self.web.summary()
if data:
try:
for item in data["SUMMARY"][0]["status"]:
@@ -153,7 +121,7 @@ class X19(BMMiner):
if self.light:
return self.light
try:
data = await self.send_web_command("get_blink_status")
data = await self.web.get_blink_status()
if data:
self.light = data["blink"]
except KeyError:
@@ -193,42 +161,34 @@ class X19(BMMiner):
):
if not hostname:
hostname = await self.get_hostname()
payload = {
"ipAddress": ip,
"ipDns": dns,
"ipGateway": gateway,
"ipHost": hostname,
"ipPro": 2, # static
"ipSub": subnet_mask,
}
await self.send_web_command("set_network_conf", params=payload)
await self.web.set_network_conf(
ip=ip,
dns=dns,
gateway=gateway,
subnet_mask=subnet_mask,
hostname=hostname,
protocol=2,
)
async def set_dhcp(self, hostname: str = None):
if not hostname:
hostname = await self.get_hostname()
payload = {
"ipAddress": "",
"ipDns": "",
"ipGateway": "",
"ipHost": hostname,
"ipPro": 1, # DHCP
"ipSub": "",
}
await self.send_web_command("set_network_conf", params=payload)
await self.web.set_network_conf(
ip=ip, dns="", gateway="", subnet_mask="", hostname=hostname, protocol=1
)
async def set_hostname(self, hostname: str):
cfg = await self.send_web_command("get_network_info")
cfg = await self.web.get_network_info()
dns = cfg["conf_dnsservers"]
gateway = cfg["conf_gateway"]
ip = cfg["conf_ipaddress"]
subnet_mask = cfg["conf_netmask"]
protocol = 1 if cfg["conf_nettype"] == "DHCP" else 2
payload = {
"ipAddress": ip,
"ipDns": dns,
"ipGateway": gateway,
"ipHost": hostname,
"ipPro": protocol,
"ipSub": subnet_mask,
}
await self.send_web_command("set_network_conf", params=payload)
await self.web.set_network_conf(
ip=ip,
dns=dns,
gateway=gateway,
subnet_mask=subnet_mask,
hostname=hostname,
protocol=protocol,
)

View File

@@ -17,17 +17,16 @@
import ipaddress
import logging
from collections import namedtuple
from typing import List, Optional, Tuple, Union
from typing import List, Optional, Tuple
import asyncssh
from pyasic.API.bmminer import BMMinerAPI
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard, MinerData
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.base import BaseMiner
from pyasic.settings import PyasicSettings
class BMMiner(BaseMiner):

View File

@@ -26,11 +26,10 @@ import toml
from pyasic.API.bosminer import BOSMinerAPI
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard, MinerData
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import BraiinsOSError, MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.base import BaseMiner
from pyasic.settings import PyasicSettings
class BOSMiner(BaseMiner):
@@ -840,7 +839,6 @@ class BOSMiner(BaseMiner):
api_devs = await self.api.devs()
except APIError:
pass
nom_hr = 0
if api_devs:
try:

View File

@@ -16,8 +16,7 @@
import ipaddress
import logging
from collections import namedtuple
from typing import List, Optional, Tuple, Union
from typing import List, Optional, Tuple
import asyncssh
@@ -180,5 +179,5 @@ class BOSMinerOld(BaseMiner):
async def get_nominal_hashrate(self) -> Optional[float]:
return None
async def get_data(self, allow_warning: bool = False) -> MinerData:
async def get_data(self, allow_warning: bool = False, **kwargs) -> MinerData:
return MinerData(ip=str(self.ip))

View File

@@ -18,15 +18,14 @@ import ipaddress
import logging
import warnings
from collections import namedtuple
from typing import List, Optional, Tuple, Union
from typing import List, Optional, Tuple
from pyasic.API.btminer import BTMinerAPI
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard, MinerData
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import MinerErrorData, WhatsminerError
from pyasic.errors import APIError
from pyasic.miners.base import BaseMiner
from pyasic.settings import PyasicSettings
class BTMiner(BaseMiner):
@@ -122,17 +121,7 @@ class BTMiner(BaseMiner):
pools_conf = conf["pools"]
try:
await self.api.update_pools(
pools_conf[0]["url"],
pools_conf[0]["user"],
pools_conf[0]["pass"],
pools_conf[1]["url"],
pools_conf[1]["user"],
pools_conf[1]["pass"],
pools_conf[2]["url"],
pools_conf[2]["user"],
pools_conf[2]["pass"],
)
await self.api.update_pools(**pools_conf)
except APIError:
pass
try:
@@ -546,8 +535,6 @@ class BTMiner(BaseMiner):
pass
async def get_fault_light(self, api_get_miner_info: dict = None) -> bool:
data = None
if not api_get_miner_info:
try:
api_get_miner_info = await self.api.get_miner_info()

View File

@@ -17,17 +17,16 @@
import ipaddress
import logging
from collections import namedtuple
from typing import List, Optional, Tuple, Union
from typing import List, Optional, Tuple
import asyncssh
from pyasic.API.cgminer import CGMinerAPI
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard, MinerData
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.base import BaseMiner
from pyasic.settings import PyasicSettings
class CGMiner(BaseMiner):

View File

@@ -14,20 +14,15 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
import ipaddress
import logging
import re
from collections import namedtuple
from typing import List, Optional, Tuple, Union
from typing import List, Optional
from pyasic.API.cgminer import CGMinerAPI
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard, MinerData
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import MinerErrorData
from pyasic.errors import APIError
from pyasic.miners._backends import CGMiner
from pyasic.miners.base import BaseMiner
from pyasic.settings import PyasicSettings
class CGMinerAvalon(CGMiner):
@@ -79,7 +74,7 @@ class CGMinerAvalon(CGMiner):
logging.debug(f"{self}: Sending config.") # noqa - This doesnt work...
conf = config.as_avalon(user_suffix=user_suffix)
try:
data = await self.api.ascset(
data = await self.api.ascset( # noqa
0, "setpool", f"root,root,{conf}"
) # this should work but doesn't
except APIError:

View File

@@ -14,92 +14,19 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
import logging
import warnings
from typing import Optional, Union
import httpx
from typing import Optional
from pyasic.errors import APIError
from pyasic.miners._backends.bmminer import BMMiner
from pyasic.settings import PyasicSettings
from pyasic.web.vnish import VNishWebAPI
class VNish(BMMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver)
self.api_type = "VNish"
self.uname = "root"
self.pwd = PyasicSettings().global_vnish_password
self.jwt = None
async def auth(self):
async with httpx.AsyncClient() as client:
try:
auth = await client.post(
f"http://{self.ip}/api/v1/unlock",
json={"pw": self.pwd},
)
except httpx.HTTPError:
warnings.warn(f"Could not authenticate web token with miner: {self}")
else:
if not auth.status_code == 200:
warnings.warn(
f"Could not authenticate web token with miner: {self}"
)
return None
json_auth = auth.json()
self.jwt = json_auth["token"]
return self.jwt
async def send_web_command(
self, command: str, data: Union[dict, None] = None, method: str = "GET"
):
if not self.jwt:
await self.auth()
if not data:
data = {}
async with httpx.AsyncClient() as client:
for i in range(PyasicSettings().miner_get_data_retries):
try:
auth = self.jwt
if command.startswith("system"):
auth = "Bearer " + self.jwt
if method == "GET":
response = await client.get(
f"http://{self.ip}/api/v1/{command}",
headers={"Authorization": auth},
timeout=5,
)
elif method == "POST":
if data:
response = await client.post(
f"http://{self.ip}/api/v1/{command}",
headers={"Authorization": auth},
timeout=5,
json=data,
)
else:
response = await client.post(
f"http://{self.ip}/api/v1/{command}",
headers={"Authorization": auth},
timeout=5,
)
else:
raise APIError("Bad method type.")
if not response.status_code == 200:
# refresh the token, retry
await self.auth()
continue
json_data = response.json()
if json_data:
return json_data
return True
except httpx.HTTPError:
pass
except json.JSONDecodeError:
pass
self.web = VNishWebAPI(ip)
async def get_model(self, api_stats: dict = None) -> Optional[str]:
# check if model is cached
@@ -122,16 +49,26 @@ class VNish(BMMiner):
pass
async def restart_backend(self) -> bool:
data = await self.send_web_command("mining/restart", method="POST")
return data
data = await self.web.restart_vnish()
if data:
try:
return data["success"]
except KeyError:
pass
return False
async def reboot(self) -> bool:
data = await self.send_web_command("system/reboot", method="POST")
return data
data = await self.web.reboot()
if data:
try:
return data["success"]
except KeyError:
pass
return False
async def get_mac(self, web_summary: dict = None) -> str:
if not web_summary:
web_info = await self.send_web_command("info")
web_info = await self.web.info()
if web_info:
try:
@@ -149,7 +86,7 @@ class VNish(BMMiner):
async def get_hostname(self, web_summary: dict = None) -> str:
if not web_summary:
web_info = await self.send_web_command("info")
web_info = await self.web.info()
if web_info:
try:
@@ -167,7 +104,7 @@ class VNish(BMMiner):
async def get_wattage(self, web_summary: dict = None) -> Optional[int]:
if not web_summary:
web_summary = await self.send_web_command("summary")
web_summary = await self.web.summary()
if web_summary:
try:
@@ -196,7 +133,7 @@ class VNish(BMMiner):
async def get_wattage_limit(self, web_settings: dict = None) -> Optional[int]:
if not web_settings:
web_settings = await self.send_web_command("summary")
web_settings = await self.web.summary()
if web_settings:
try:
@@ -207,7 +144,7 @@ class VNish(BMMiner):
async def get_fw_ver(self, web_summary: dict = None) -> Optional[str]:
if not web_summary:
web_summary = await self.send_web_command("summary")
web_summary = await self.web.summary()
if web_summary:
try:

View File

@@ -14,45 +14,21 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
from typing import Optional, Union
import httpx
from typing import Union
from pyasic.miners._backends import BMMiner # noqa - Ignore access to _module
from pyasic.settings import PyasicSettings
from pyasic.web.X17 import X17WebAPI
class BMMinerX17(BMMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.uname = "root"
self.pwd = PyasicSettings().global_x17_password
async def send_web_command(
self, command: str, params: dict = None
) -> Optional[dict]:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.uname, self.pwd)
try:
async with httpx.AsyncClient() as client:
if params:
data = await client.post(url, data=params, auth=auth)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
self.web = X17WebAPI(ip)
async def get_mac(self) -> Union[str, None]:
try:
data = await self.send_web_command("get_system_info")
data = await self.web.get_system_info()
if data:
return data["macaddr"]
except KeyError:
@@ -60,11 +36,9 @@ class BMMinerX17(BMMiner):
async def fault_light_on(self) -> bool:
# this should time out, after it does do a check
await self.send_web_command("blink", params={"action": "startBlink"})
await self.web.blink(blink=True)
try:
data = await self.send_web_command(
"blink", params={"action": "onPageLoaded"}
)
data = await self.web.get_blink_status()
if data:
if data["isBlinking"]:
self.light = True
@@ -73,11 +47,9 @@ class BMMinerX17(BMMiner):
return self.light
async def fault_light_off(self) -> bool:
await self.send_web_command("blink", params={"action": "stopBlink"})
await self.web.blink(blink=False)
try:
data = await self.send_web_command(
"blink", params={"action": "onPageLoaded"}
)
data = await self.web.get_blink_status()
if data:
if not data["isBlinking"]:
self.light = False
@@ -86,7 +58,7 @@ class BMMinerX17(BMMiner):
return self.light
async def reboot(self) -> bool:
data = await self.send_web_command("reboot")
data = await self.web.reboot()
if data:
return True
return False
@@ -95,9 +67,7 @@ class BMMinerX17(BMMiner):
if self.light:
return self.light
try:
data = await self.send_web_command(
"blink", params={"action": "onPageLoaded"}
)
data = await self.web.get_blink_status()
if data:
self.light = data["isBlinking"]
except KeyError:
@@ -106,7 +76,7 @@ class BMMinerX17(BMMiner):
async def get_hostname(self) -> Union[str, None]:
try:
data = await self.send_web_command("get_system_info")
data = await self.web.get_system_info()
if data:
return data["hostname"]
except KeyError:

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners._backends import X19
from pyasic.miners._backends import X19 # noqa - Ignore access to _module
from pyasic.miners._types import S19 # noqa - Ignore access to _module
# noqa - Ignore access to _module

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners._backends import X19
from pyasic.miners._backends import X19 # noqa - Ignore access to _module
from pyasic.miners._types import S19Pro # noqa - Ignore access to _module
# noqa - Ignore access to _module

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners._backends import X19
from pyasic.miners._backends import X19 # noqa - Ignore access to _module
from pyasic.miners._types import S19XP # noqa - Ignore access to _module
# noqa - Ignore access to _module

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners._backends import X19
from pyasic.miners._backends import X19 # noqa - Ignore access to _module
from pyasic.miners._types import S19a # noqa - Ignore access to _module
# noqa - Ignore access to _module

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners._backends import X19
from pyasic.miners._backends import X19 # noqa - Ignore access to _module
from pyasic.miners._types import S19aPro # noqa - Ignore access to _module
# noqa - Ignore access to _module

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners._backends import X19
from pyasic.miners._backends import X19 # noqa - Ignore access to _module
from pyasic.miners._types import S19j # noqa - Ignore access to _module
# noqa - Ignore access to _module

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners._backends import X19
from pyasic.miners._backends import X19 # noqa - Ignore access to _module
from pyasic.miners._types import S19jPro # noqa - Ignore access to _module
# noqa - Ignore access to _module

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners._backends import X19
from pyasic.miners._backends import X19 # noqa - Ignore access to _module
from pyasic.miners._types import T19 # noqa - Ignore access to _module
# noqa - Ignore access to _module

View File

@@ -14,53 +14,29 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
from typing import Optional, Union
import httpx
from typing import Union
from pyasic.miners._backends import BMMiner # noqa - Ignore access to _module
from pyasic.miners._types import S9 # noqa - Ignore access to _module
from pyasic.settings import PyasicSettings
from pyasic.web.S9 import S9WebAPI
class BMMinerS9(BMMiner, S9):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.uname = "root"
self.pwd = PyasicSettings().global_x19_password
async def send_web_command(
self, command: str, params: dict = None
) -> Optional[dict]:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.uname, self.pwd)
try:
async with httpx.AsyncClient() as client:
if params:
data = await client.post(url, data=params, auth=auth)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
self.web = S9WebAPI(ip)
async def get_mac(self) -> Union[str, None]:
try:
data = await self.send_web_command("get_system_info")
data = await self.web.get_system_info()
if data:
return data["macaddr"]
except KeyError:
pass
try:
data = await self.send_web_command("get_network_info")
data = await self.web.get_network_info()
if data:
return data["macaddr"]
except KeyError:

View File

@@ -18,11 +18,10 @@ from typing import List, Optional
import asyncssh
from pyasic.data import HashBoard, MinerData
from pyasic.data import HashBoard
from pyasic.errors import APIError
from pyasic.miners._backends import Hiveon # noqa - Ignore access to _module
from pyasic.miners._types import T9 # noqa - Ignore access to _module
from pyasic.settings import PyasicSettings
class HiveonT9(Hiveon, T9):

View File

@@ -31,9 +31,10 @@ from pyasic.errors import APIError
class BaseMiner(ABC):
def __init__(self, *args, **kwargs) -> None:
self.ip = None
self.uname = "root"
self.pwd = "admin"
self.api = None
self.web = None
self.uname = None
self.pwd = None
self.api_type = None
self.api_ver = None
self.fw_ver = None
@@ -396,8 +397,13 @@ class BaseMiner(ABC):
web_data = {}
for command in web_params:
data = await self.send_web_command(command) # noqa: web only anyway
web_data[command] = data
try:
cmd_func = getattr(self.web, command)
data = await cmd_func() # noqa: web only anyway
except (LookupError, APIError):
pass
else:
web_data[command] = data
for data_name in data_to_get:
function = getattr(self, "get_" + data_name)
sig = inspect.signature(function)

View File

@@ -13,13 +13,8 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
import logging
import warnings
from collections import namedtuple
from typing import List, Optional, Tuple, Union
import httpx
from typing import List, Optional
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard
@@ -27,65 +22,14 @@ from pyasic.data.error_codes import InnosiliconError, MinerErrorData
from pyasic.errors import APIError
from pyasic.miners._backends import CGMiner # noqa - Ignore access to _module
from pyasic.miners._types import InnosiliconT3HPlus # noqa - Ignore access to _module
from pyasic.settings import PyasicSettings
from pyasic.web.Inno import InnosiliconWebAPI
class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.uname = "admin"
self.pwd = PyasicSettings().global_innosilicon_password
self.jwt = None
async def auth(self):
async with httpx.AsyncClient() as client:
try:
auth = await client.post(
f"http://{self.ip}/api/auth",
data={"username": self.uname, "password": self.pwd},
)
except httpx.HTTPError:
warnings.warn(f"Could not authenticate web token with miner: {self}")
else:
json_auth = auth.json()
self.jwt = json_auth.get("jwt")
return self.jwt
async def send_web_command(self, command: str, data: Union[dict, None] = None):
if not self.jwt:
await self.auth()
if not data:
data = {}
async with httpx.AsyncClient() as client:
for i in range(PyasicSettings().miner_get_data_retries):
try:
response = await client.post(
f"http://{self.ip}/api/{command}",
headers={"Authorization": "Bearer " + self.jwt},
timeout=5,
data=data,
)
json_data = response.json()
if (
not json_data.get("success")
and "token" in json_data
and json_data.get("token") == "expired"
):
# refresh the token, retry
await self.auth()
continue
if not json_data.get("success"):
if json_data.get("msg"):
raise APIError(json_data["msg"])
elif json_data.get("message"):
raise APIError(json_data["message"])
raise APIError("Innosilicon web api command failed.")
return json_data
except httpx.HTTPError:
pass
except json.JSONDecodeError:
pass
self.web = InnosiliconWebAPI(ip)
async def fault_light_on(self) -> bool:
return False
@@ -108,7 +52,7 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
async def reboot(self) -> bool:
try:
data = await self.send_web_command("reboot")
data = await self.web.reboot()
except APIError:
pass
else:
@@ -116,7 +60,7 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
async def restart_cgminer(self) -> bool:
try:
data = await self.send_web_command("restartCgMiner")
data = await self.web.restart_cgminer()
except APIError:
pass
else:
@@ -127,29 +71,24 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
await self.send_web_command(
"updatePools", data=config.as_inno(user_suffix=user_suffix)
)
await self.web.update_pools(config.as_inno(user_suffix=user_suffix))
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(
self,
web_getAll: dict = None, # noqa
web_overview: dict = None, # noqa: named this way for automatic functionality
self, web_get_all: dict = None, web_overview: dict = None
) -> Optional[str]:
web_all_data = web_getAll.get("all")
if not web_all_data and not web_overview:
if not web_get_all and not web_overview:
try:
web_overview = await self.send_web_command("overview")
web_overview = await self.web.overview()
except APIError:
pass
if web_all_data:
if web_get_all:
try:
mac = web_all_data["mac"]
mac = web_get_all["mac"]
return mac.upper()
except KeyError:
pass
@@ -168,7 +107,7 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
if not web_type:
try:
web_type = await self.send_web_command("type")
web_type = await self.web.type()
except APIError:
pass
@@ -180,21 +119,18 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
pass
async def get_hashrate(
self,
api_summary: dict = None,
web_getAll: dict = None, # noqa: named this way for automatic functionality
self, api_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
web_all_data = web_getAll.get("all")
if not api_summary and not web_all_data:
if not api_summary and not web_get_all:
try:
api_summary = await self.api.summary()
except APIError:
pass
if web_all_data:
if web_get_all:
try:
return round(
float(web_all_data["total_hash"]["Hash Rate H"] / 1000000000000), 2
float(web_get_all["total_hash"]["Hash Rate H"] / 1000000000000), 2
)
except KeyError:
pass
@@ -206,11 +142,8 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
pass
async def get_hashboards(
self,
api_stats: dict = None,
web_getAll: dict = None, # noqa: named this way for automatic functionality
self, api_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
web_all_data = web_getAll.get("all")
hashboards = [
HashBoard(slot=i, expected_chips=self.nominal_chips)
for i in range(self.ideal_hashboards)
@@ -222,13 +155,13 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
except APIError:
pass
if not web_all_data:
if not web_get_all:
try:
web_all_data = await self.send_web_command("getAll")
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_all_data = web_all_data["all"]
web_get_all = web_get_all["all"]
if api_stats:
if api_stats.get("STATS"):
@@ -242,9 +175,9 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
hashboards[idx].chips = chips
hashboards[idx].missing = False
if web_all_data:
if web_all_data.get("chain"):
for board in web_all_data["chain"]:
if web_get_all:
if web_get_all.get("chain"):
for board in web_get_all["chain"]:
idx = board.get("ASC")
if idx is not None:
temp = board.get("Temp min")
@@ -264,22 +197,19 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
return hashboards
async def get_wattage(
self,
web_getAll: dict = None,
api_stats: dict = None, # noqa: named this way for automatic functionality
self, web_get_all: dict = None, api_stats: dict = None
) -> Optional[int]:
web_all_data = web_getAll.get("all")
if not web_all_data:
if not web_get_all:
try:
web_all_data = await self.send_web_command("getAll")
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_all_data = web_all_data["all"]
web_get_all = web_get_all["all"]
if web_all_data:
if web_get_all:
try:
return web_all_data["power"]
return web_get_all["power"]
except KeyError:
pass
@@ -300,23 +230,19 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
wattage = int(wattage)
return wattage
async def get_fans(
self,
web_getAll: dict = None, # noqa: named this way for automatic functionality
) -> List[Fan]:
web_all_data = web_getAll.get("all")
if not web_all_data:
async def get_fans(self, web_get_all: dict = None) -> List[Fan]:
if not web_get_all:
try:
web_all_data = await self.send_web_command("getAll")
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_all_data = web_all_data["all"]
web_get_all = web_get_all["all"]
fan_data = [Fan(), Fan(), Fan(), Fan()]
if web_all_data:
if web_get_all:
try:
spd = web_all_data["fansSpeed"]
spd = web_get_all["fansSpeed"]
except KeyError:
pass
else:
@@ -353,21 +279,20 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
return groups
async def get_errors(
self, web_getErrorDetail: dict = None
self, web_get_error_detail: dict = None
) -> List[MinerErrorData]: # noqa: named this way for automatic functionality
web_error_details = web_getErrorDetail
errors = []
if not web_error_details:
if not web_get_error_detail:
try:
web_error_details = await self.send_web_command("getErrorDetail")
web_get_error_detail = await self.web.get_error_detail()
except APIError:
pass
if web_error_details:
if web_get_error_detail:
try:
# only 1 error?
# TODO: check if this should be a loop, can't remember.
err = web_error_details["code"]
err = web_get_error_detail["code"]
except KeyError:
pass
else:
@@ -376,19 +301,18 @@ class CGMinerInnosiliconT3HPlus(CGMiner, InnosiliconT3HPlus):
errors.append(InnosiliconError(error_code=err))
return errors
async def get_wattage_limit(self, web_getAll: dict = None) -> Optional[int]:
web_all_data = web_getAll.get("all")
if not web_all_data:
async def get_wattage_limit(self, web_get_all: dict = None) -> Optional[int]:
if not web_get_all:
try:
web_all_data = await self.send_web_command("getAll")
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_all_data = web_all_data["all"]
web_get_all = web_get_all["all"]
if web_all_data:
if web_get_all:
try:
level = web_all_data["running_mode"]["level"]
level = web_get_all["running_mode"]["level"]
except KeyError:
pass
else:

View File

@@ -27,7 +27,8 @@ class _MinerListener:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, _addr):
@staticmethod
def datagram_received(data, _addr):
m = data.decode()
if "," in m:
ip, mac = m.split(",")

View File

@@ -14,7 +14,6 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from collections import namedtuple
from typing import List, Optional, Tuple
from pyasic.API.unknown import UnknownAPI
@@ -26,7 +25,9 @@ from pyasic.miners.base import BaseMiner
class UnknownMiner(BaseMiner):
def __init__(self, ip: str, *args, **kwargs) -> None:
def __init__(
self, ip: str, *args, **kwargs
) -> None: # noqa - ignore *args and **kwargs for signature consistency
super().__init__()
self.ip = ip
self.api = UnknownAPI(ip)

106
pyasic/web/Inno.py Normal file
View File

@@ -0,0 +1,106 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
import warnings
from typing import Union
import httpx
from pyasic.errors import APIError
from pyasic.settings import PyasicSettings
from pyasic.web import BaseWebAPI
class InnosiliconWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.username = "admin"
self.pwd = PyasicSettings().global_innosilicon_password
self.jwt = None
async def auth(self):
async with httpx.AsyncClient() as client:
try:
auth = await client.post(
f"http://{self.ip}/api/auth",
data={"username": self.username, "password": self.pwd},
)
except httpx.HTTPError:
warnings.warn(f"Could not authenticate web token with miner: {self}")
else:
json_auth = auth.json()
self.jwt = json_auth.get("jwt")
return self.jwt
async def send_command(
self,
command: Union[str, bytes],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
if not self.jwt:
await self.auth()
async with httpx.AsyncClient() as client:
for i in range(PyasicSettings().miner_get_data_retries):
try:
response = await client.post(
f"http://{self.ip}/api/{command}",
headers={"Authorization": "Bearer " + self.jwt},
timeout=5,
data=parameters,
)
json_data = response.json()
if (
not json_data.get("success")
and "token" in json_data
and json_data.get("token") == "expired"
):
# refresh the token, retry
await self.auth()
continue
if not json_data.get("success"):
if json_data.get("msg"):
raise APIError(json_data["msg"])
elif json_data.get("message"):
raise APIError(json_data["message"])
raise APIError("Innosilicon web api command failed.")
return json_data
except httpx.HTTPError:
pass
except json.JSONDecodeError:
pass
async def reboot(self) -> dict:
return await self.send_command("reboot")
async def restart_cgminer(self) -> dict:
return await self.send_command("restartCgMiner")
async def update_pools(self, conf: dict) -> dict:
return await self.send_command("updatePools", **conf)
async def overview(self) -> dict:
return await self.send_command("overview")
async def type(self) -> dict:
return await self.send_command("type")
async def get_all(self):
return await self.send_command("getAll")
async def get_error_detail(self):
return await self.send_command("getErrorDetail")

58
pyasic/web/S9.py Normal file
View File

@@ -0,0 +1,58 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
from typing import Union
import httpx
from pyasic.settings import PyasicSettings
from pyasic.web import BaseWebAPI
class S9WebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.pwd = PyasicSettings().global_x17_password
async def send_command(
self,
command: Union[str, bytes],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd)
try:
async with httpx.AsyncClient() as client:
if parameters:
data = await client.post(url, data=parameters, auth=auth)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
async def get_system_info(self) -> dict:
return await self.send_command("get_system_info")
async def get_network_info(self) -> dict:
return await self.send_command("get_network_info")

66
pyasic/web/X17.py Normal file
View File

@@ -0,0 +1,66 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
from typing import Union
import httpx
from pyasic.settings import PyasicSettings
from pyasic.web import BaseWebAPI
class X17WebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.pwd = PyasicSettings().global_x17_password
async def send_command(
self,
command: Union[str, bytes],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd)
try:
async with httpx.AsyncClient() as client:
if parameters:
data = await client.post(url, data=parameters, auth=auth)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
async def get_system_info(self) -> dict:
return await self.send_command("get_system_info")
async def blink(self, blink: bool) -> dict:
if blink:
return await self.send_command("blink", action="startBlink")
return await self.send_command("blink", action="stopBlink")
async def reboot(self) -> dict:
return await self.send_command("reboot")
async def get_blink_status(self) -> dict:
return await self.send_command("blink", action="onPageLoaded")

99
pyasic/web/X19.py Normal file
View File

@@ -0,0 +1,99 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
from typing import Union
import httpx
from pyasic.settings import PyasicSettings
from pyasic.web import BaseWebAPI
class X19WebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.pwd = PyasicSettings().global_x19_password
async def send_command(
self,
command: Union[str, bytes],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd)
try:
async with httpx.AsyncClient() as client:
if parameters:
data = await client.post(
url, data=json.dumps(parameters), auth=auth # noqa
)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
async def get_miner_conf(self) -> dict:
return await self.send_command("get_miner_conf")
async def set_miner_conf(self, conf: dict) -> dict:
return await self.send_command("set_miner_conf", **conf)
async def blink(self, blink: bool) -> dict:
if blink:
return await self.send_command("blink", blink="true")
return await self.send_command("blink", blink="false")
async def reboot(self) -> dict:
return await self.send_command("reboot")
async def get_system_info(self) -> dict:
return await self.send_command("get_system_info")
async def get_network_info(self) -> dict:
return await self.send_command("get_network_info")
async def summary(self) -> dict:
return await self.send_command("summary")
async def get_blink_status(self) -> dict:
return await self.send_command("get_blink_status")
async def set_network_conf(
self,
ip: str,
dns: str,
gateway: str,
subnet_mask: str,
hostname: str,
protocol: int,
) -> dict:
return await self.send_command(
"set_network_conf",
ipAddress=ip,
ipDns=dns,
ipGateway=gateway,
ipHost=hostname,
ipPro=protocol,
ipSub=subnet_mask,
)

87
pyasic/web/__init__.py Normal file
View File

@@ -0,0 +1,87 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import ipaddress
import warnings
from abc import ABC, abstractmethod
from typing import Union
from pyasic.errors import APIWarning
class BaseWebAPI(ABC):
def __init__(self, ip: str) -> None:
# ip address of the miner
self.ip = ipaddress.ip_address(ip)
self.username = "root"
self.pwd = "root"
def __new__(cls, *args, **kwargs):
if cls is BaseWebAPI:
raise TypeError(f"Only children of '{cls.__name__}' may be instantiated")
return object.__new__(cls)
def __repr__(self):
return f"{self.__class__.__name__}: {str(self.ip)}"
@abstractmethod
async def send_command(
self,
command: Union[str, bytes],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
pass
def _check_commands(self, *commands):
allowed_commands = self.get_commands()
return_commands = []
for command in [*commands]:
if command in allowed_commands:
return_commands.append(command)
else:
warnings.warn(
f"""Removing incorrect command: {command}
If you are sure you want to use this command please use WebAPI.send_command("{command}", ignore_errors=True) instead.""",
APIWarning,
)
return return_commands
@property
def commands(self) -> list:
return self.get_commands()
def get_commands(self) -> list:
"""Get a list of command accessible to a specific type of web API on the miner.
Returns:
A list of all web commands that the miner supports.
"""
return [
func
for func in
# each function in self
dir(self)
if not func == "commands"
if callable(getattr(self, func)) and
# no __ or _ methods
not func.startswith("__") and not func.startswith("_") and
# remove all functions that are in this base class
func
not in [
func for func in dir(BaseWebAPI) if callable(getattr(BaseWebAPI, func))
]
]

112
pyasic/web/vnish.py Normal file
View File

@@ -0,0 +1,112 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
import warnings
from typing import Union
import httpx
from pyasic.settings import PyasicSettings
from pyasic.web import BaseWebAPI
class VNishWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.username = "admin"
self.pwd = PyasicSettings().global_vnish_password
self.token = None
async def auth(self):
async with httpx.AsyncClient() as client:
try:
auth = await client.post(
f"http://{self.ip}/api/v1/unlock",
json={"pw": self.pwd},
)
except httpx.HTTPError:
warnings.warn(f"Could not authenticate web token with miner: {self}")
else:
if not auth.status_code == 200:
warnings.warn(
f"Could not authenticate web token with miner: {self}"
)
return None
json_auth = auth.json()
self.token = json_auth["token"]
return self.token
async def send_command(
self,
command: Union[str, bytes],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
if not self.token:
await self.auth()
async with httpx.AsyncClient() as client:
for i in range(PyasicSettings().miner_get_data_retries):
try:
auth = self.token
if command.startswith("system"):
auth = "Bearer " + self.token
if parameters.get("post"):
parameters.pop("post")
response = await client.post(
f"http://{self.ip}/api/v1/{command}",
headers={"Authorization": auth},
timeout=5,
json=parameters,
)
elif not parameters == {}:
response = await client.post(
f"http://{self.ip}/api/v1/{command}",
headers={"Authorization": auth},
timeout=5,
json=parameters,
)
else:
response = await client.get(
f"http://{self.ip}/api/v1/{command}",
headers={"Authorization": auth},
timeout=5,
)
if not response.status_code == 200:
# refresh the token, retry
await self.auth()
continue
json_data = response.json()
if json_data:
return json_data
return {"success": True}
except httpx.HTTPError:
pass
except json.JSONDecodeError:
pass
async def restart_vnish(self) -> dict:
return await self.send_command("mining/restart", post=True)
async def reboot(self) -> dict:
return await self.send_command("system/reboot", post=True)
async def info(self):
return await self.send_command("info")
async def summary(self):
return await self.send_command("summary")