feature: start refactoring BOSer and BOSMiner into separate classes.

This commit is contained in:
b-rowan
2024-01-10 22:12:27 -07:00
committed by UpstreamData
parent 672e753afb
commit 928e0dd028
5 changed files with 675 additions and 338 deletions

View File

@@ -17,13 +17,7 @@ from dataclasses import dataclass, field
from typing import Union
from pyasic.config.base import MinerConfigOption, MinerConfigValue
from pyasic.web.braiins_os.proto.braiins.bos.v1 import (
DpsPowerTarget,
DpsTarget,
Hours,
Power,
SetDpsRequest,
)
from pyasic.web.braiins_os.proto.braiins.bos.v1 import DpsPowerTarget, DpsTarget, Hours
@dataclass
@@ -43,8 +37,13 @@ class PowerScalingShutdownEnabled(MinerConfigValue):
return cfg
def as_boser(self) -> dict:
return {"enable_shutdown": True, "shutdown_duration": self.duration}
def as_bos_grpc(self) -> dict:
cfg = {"enable_shutdown ": True}
if self.duration is not None:
cfg["shutdown_duration"] = Hours(self.duration)
return cfg
@dataclass
@@ -58,7 +57,7 @@ class PowerScalingShutdownDisabled(MinerConfigValue):
def as_bosminer(self) -> dict:
return {"shutdown_enabled": False}
def as_boser(self) -> dict:
def as_bos_grpc(self) -> dict:
return {"enable_shutdown ": False}
@@ -89,19 +88,6 @@ class PowerScalingShutdown(MinerConfigOption):
return cls.disabled()
return None
@classmethod
def from_boser(cls, power_scaling_conf: dict):
sd_enabled = power_scaling_conf.get("shutdownEnabled")
if sd_enabled is not None:
if sd_enabled:
try:
return cls.enabled(power_scaling_conf["shutdownDuration"]["hours"])
except KeyError:
return cls.enabled()
else:
return cls.disabled()
return None
@dataclass
class PowerScalingEnabled(MinerConfigValue):
@@ -147,19 +133,20 @@ class PowerScalingEnabled(MinerConfigValue):
return {"power_scaling": cfg}
def as_boser(self) -> dict:
return {
"set_dps": SetDpsRequest(
enable=True,
**self.shutdown_enabled.as_boser(),
target=DpsTarget(
power_target=DpsPowerTarget(
power_step=Power(self.power_step),
min_power_target=Power(self.minimum_power),
)
),
),
}
def as_bos_grpc(self) -> dict:
cfg = {"enable": True}
target_conf = {}
if self.power_step is not None:
target_conf["power_step"] = self.power_step
if self.minimum_power is not None:
target_conf["min_power_target"] = self.minimum_power
cfg["target"] = DpsTarget(power_target=DpsPowerTarget(**target_conf))
if self.shutdown_enabled is not None:
cfg = {**cfg, **self.shutdown_enabled.as_bos_grpc()}
return {"dps": cfg}
@dataclass
@@ -200,21 +187,3 @@ class PowerScalingConfig(MinerConfigOption):
return cls.disabled()
return cls.default()
@classmethod
def from_boser(cls, grpc_miner_conf: dict):
try:
dps_conf = grpc_miner_conf["dps"]
if not dps_conf.get("enabled", False):
return cls.disabled()
except LookupError:
return cls.default()
conf = {}
conf["shutdown_enabled"] = PowerScalingShutdown.from_boser(dps_conf)
if dps_conf.get("minPowerTarget") is not None:
conf["minimum_power"] = dps_conf["minPowerTarget"]["watt"]
if dps_conf.get("powerStep") is not None:
conf["power_step"] = dps_conf["powerStep"]["watt"]
return cls.enabled(**conf)

View File

@@ -1,155 +0,0 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import logging
from typing import List, Optional, Tuple
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard, MinerData
from pyasic.data.error_codes import MinerErrorData
from pyasic.miners.backends import BOSMiner
class BOSMinerOld(BOSMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver)
async def send_ssh_command(self, cmd: str) -> Optional[str]:
result = None
try:
conn = await self._get_ssh_connection()
except ConnectionError:
return None
# open an ssh connection
async with conn:
# 3 retries
for i in range(3):
try:
# run the command and get the result
result = await conn.run(cmd)
result = result.stdout
except Exception as e:
# if the command fails, log it
logging.warning(f"{self} command {cmd} error: {e}")
# on the 3rd retry, return None
if i == 3:
return
continue
# return the result, either command output or None
return result
async def update_to_plus(self):
result = await self.send_ssh_command("opkg update && opkg install bos_plus")
return result
async def check_light(self) -> bool:
return False
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self) -> None:
return None
async def reboot(self) -> bool:
return False
async def restart_backend(self) -> bool:
return False
async def stop_mining(self) -> bool:
return False
async def resume_mining(self) -> bool:
return False
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
return None
async def set_power_limit(self, wattage: int) -> bool:
return False
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_mac(self, *args, **kwargs) -> Optional[str]:
return None
async def get_model(self, *args, **kwargs) -> str:
return "S9"
async def get_version(self, *args, **kwargs) -> Tuple[Optional[str], Optional[str]]:
return None, None
async def _get_hostname(self, *args, **kwargs) -> Optional[str]:
return None
async def _get_hashrate(self, *args, **kwargs) -> Optional[float]:
return None
async def _get_hashboards(self, *args, **kwargs) -> List[HashBoard]:
return []
async def _get_env_temp(self, *args, **kwargs) -> Optional[float]:
return None
async def _get_wattage(self, *args, **kwargs) -> Optional[int]:
return None
async def _get_wattage_limit(self, *args, **kwargs) -> Optional[int]:
return None
async def _get_fans(
self,
*args,
**kwargs,
) -> List[Fan]:
return [Fan(), Fan(), Fan(), Fan()]
async def _get_fan_psu(self, *args, **kwargs) -> Optional[int]:
return None
async def _get_api_ver(self, *args, **kwargs) -> Optional[str]:
return None
async def _get_fw_ver(self, *args, **kwargs) -> Optional[str]:
return None
async def _get_errors(self, *args, **kwargs) -> List[MinerErrorData]:
return []
async def _get_fault_light(self, *args, **kwargs) -> bool:
return False
async def _get_expected_hashrate(self, *args, **kwargs) -> Optional[float]:
return None
async def get_data(self, allow_warning: bool = False, **kwargs) -> MinerData:
return MinerData(ip=str(self.ip))
async def _is_mining(self, *args, **kwargs) -> Optional[bool]:
return None
async def _get_uptime(self, *args, **kwargs) -> Optional[int]:
return None

View File

@@ -36,18 +36,620 @@ from pyasic.miners.base import (
RPCAPICommand,
WebAPICommand,
)
from pyasic.web.bosminer import BOSMinerWebAPI
from pyasic.web.braiins_os import BOSerWebAPI, BOSMinerWebAPI
BOSMINER_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac",
[WebAPICommand("web_net_conf", "admin/network/iface_status/lan")],
),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"get_api_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.FW_VERSION): DataFunction("get_fw_ver"),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"get_hashrate",
[RPCAPICommand("api_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"get_expected_hashrate", [RPCAPICommand("api_devs", "devs")]
),
str(DataOptions.HASHBOARDS): DataFunction(
"get_hashboards",
[
WebAPICommand(
"web_net_conf", "/cgi-bin/luci/admin/network/iface_status/lan"
)
RPCAPICommand("api_temps", "temps"),
RPCAPICommand("api_devdetails", "devdetails"),
RPCAPICommand("api_devs", "devs"),
],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction("get_env_temp"),
str(DataOptions.WATTAGE): DataFunction(
"get_wattage",
[RPCAPICommand("api_tunerstatus", "tunerstatus")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"get_wattage_limit",
[RPCAPICommand("api_tunerstatus", "tunerstatus")],
),
str(DataOptions.FANS): DataFunction(
"get_fans",
[RPCAPICommand("api_fans", "fans")],
),
str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction(
"get_errors",
[RPCAPICommand("api_tunerstatus", "tunerstatus")],
),
str(DataOptions.FAULT_LIGHT): DataFunction("get_fault_light"),
str(DataOptions.IS_MINING): DataFunction(
"is_mining", [RPCAPICommand("api_devdetails", "devdetails")]
),
str(DataOptions.UPTIME): DataFunction(
"get_uptime", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
)
class BOSMiner(BaseMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip)
# interfaces
self.api = BOSMinerAPI(ip, api_ver)
self.web = BOSMinerWebAPI(ip)
# static data
self.api_type = "BOSMiner"
# data gathering locations
self.data_locations = BOSMINER_DATA_LOC
# autotuning/shutdown support
self.supports_autotuning = True
self.supports_shutdown = True
# data storage
self.api_ver = api_ver
async def send_ssh_command(self, cmd: str) -> Optional[str]:
result = None
try:
conn = await asyncio.wait_for(self._get_ssh_connection(), timeout=10)
except (ConnectionError, asyncio.TimeoutError):
return None
# open an ssh connection
async with conn:
# 3 retries
for i in range(3):
try:
# run the command and get the result
result = await conn.run(cmd)
stderr = result.stderr
result = result.stdout
if len(stderr) > len(result):
result = stderr
except Exception as e:
# if the command fails, log it
logging.warning(f"{self} command {cmd} error: {e}")
# on the 3rd retry, return None
if i == 3:
return
continue
# return the result, either command output or None
return result
async def fault_light_on(self) -> bool:
logging.debug(f"{self}: Sending fault_light on command.")
ret = await self.send_ssh_command("miner fault_light on")
logging.debug(f"{self}: fault_light on command completed.")
if isinstance(ret, str):
self.light = True
return self.light
return False
async def fault_light_off(self) -> bool:
logging.debug(f"{self}: Sending fault_light off command.")
self.light = False
ret = await self.send_ssh_command("miner fault_light off")
logging.debug(f"{self}: fault_light off command completed.")
if isinstance(ret, str):
self.light = False
return True
return False
async def restart_backend(self) -> bool:
return await self.restart_bosminer()
async def restart_bosminer(self) -> bool:
logging.debug(f"{self}: Sending bosminer restart command.")
ret = await self.send_ssh_command("/etc/init.d/bosminer restart")
logging.debug(f"{self}: bosminer restart command completed.")
if isinstance(ret, str):
return True
return False
async def stop_mining(self) -> bool:
try:
data = await self.api.pause()
except APIError:
return False
if data.get("PAUSE"):
if data["PAUSE"][0]:
return True
return False
async def resume_mining(self) -> bool:
try:
data = await self.api.resume()
except APIError:
return False
if data.get("RESUME"):
if data["RESUME"][0]:
return True
return False
async def reboot(self) -> bool:
logging.debug(f"{self}: Sending reboot command.")
ret = await self.send_ssh_command("/sbin/reboot")
logging.debug(f"{self}: Reboot command completed.")
if isinstance(ret, str):
return True
return False
async def get_config(self) -> MinerConfig:
logging.debug(f"{self}: Getting config.")
try:
conn = await self._get_ssh_connection()
except ConnectionError:
conn = None
if conn:
async with conn:
# good ol' BBB compatibility :/
toml_data = toml.loads(
(await conn.run("cat /etc/bosminer.toml")).stdout
)
logging.debug(f"{self}: Converting config file.")
cfg = MinerConfig.from_bosminer(toml_data)
self.config = cfg
return self.config
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
logging.debug(f"{self}: Sending config.")
self.config = config
toml_conf = toml.dumps(
{
"format": {
"version": "1.2+",
"generator": "pyasic",
"model": f"{self.make.replace('Miner', 'miner')} {self.model.replace(' (BOS)', '').replace('j', 'J')}",
"timestamp": int(time.time()),
},
**config.as_bosminer(user_suffix=user_suffix),
}
)
try:
conn = await self._get_ssh_connection()
except ConnectionError as e:
raise APIError("SSH connection failed when sending config.") from e
async with conn:
# BBB check because bitmain suxx
bbb_check = await conn.run(
"if [ ! -f /etc/init.d/bosminer ]; then echo '1'; else echo '0'; fi;"
)
bbb = bbb_check.stdout.strip() == "1"
if not bbb:
await conn.run("/etc/init.d/bosminer stop")
logging.debug(f"{self}: Opening SFTP connection.")
async with conn.start_sftp_client() as sftp:
logging.debug(f"{self}: Opening config file.")
async with sftp.open("/etc/bosminer.toml", "w+") as file:
await file.write(toml_conf)
logging.debug(f"{self}: Restarting BOSMiner")
await conn.run("/etc/init.d/bosminer start")
# I really hate BBB, please get rid of it if you have it
else:
await conn.run("/etc/init.d/S99bosminer stop")
logging.debug(f"{self}: BBB sending config")
await conn.run("echo '" + toml_conf + "' > /etc/bosminer.toml")
logging.debug(f"{self}: BBB restarting bosminer.")
await conn.run("/etc/init.d/S99bosminer start")
async def set_power_limit(self, wattage: int) -> bool:
try:
cfg = await self.get_config()
if cfg is None:
return False
cfg.mining_mode = MiningModePowerTune(wattage)
await self.send_config(cfg)
except Exception as e:
logging.warning(f"{self} set_power_limit: {e}")
return False
else:
return True
async def set_static_ip(
self,
ip: str,
dns: str,
gateway: str,
subnet_mask: str = "255.255.255.0",
):
cfg_data_lan = (
"config interface 'lan'\n\toption type 'bridge'\n\toption ifname 'eth0'\n\toption proto 'static'\n\toption ipaddr '"
+ ip
+ "'\n\toption netmask '"
+ subnet_mask
+ "'\n\toption gateway '"
+ gateway
+ "'\n\toption dns '"
+ dns
+ "'"
)
data = await self.send_ssh_command("cat /etc/config/network")
split_data = data.split("\n\n")
for idx in range(len(split_data)):
if "config interface 'lan'" in split_data[idx]:
split_data[idx] = cfg_data_lan
config = "\n\n".join(split_data)
conn = await self._get_ssh_connection()
async with conn:
await conn.run("echo '" + config + "' > /etc/config/network")
async def set_dhcp(self):
cfg_data_lan = "config interface 'lan'\n\toption type 'bridge'\n\toption ifname 'eth0'\n\toption proto 'dhcp'"
data = await self.send_ssh_command("cat /etc/config/network")
split_data = data.split("\n\n")
for idx in range(len(split_data)):
if "config interface 'lan'" in split_data[idx]:
split_data[idx] = cfg_data_lan
config = "\n\n".join(split_data)
conn = await self._get_ssh_connection()
async with conn:
await conn.run("echo '" + config + "' > /etc/config/network")
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(self, web_net_conf: Union[dict, list] = None) -> Optional[str]:
if not web_net_conf:
try:
web_net_conf = await self.web.luci.send_command(
"admin/network/iface_status/lan"
)
except APIError:
pass
if isinstance(web_net_conf, dict):
if "admin/network/iface_status/lan" in web_net_conf.keys():
web_net_conf = web_net_conf["admin/network/iface_status/lan"]
if web_net_conf:
try:
return web_net_conf[0]["macaddr"]
except LookupError:
pass
# could use ssh, but its slow and buggy
# result = await self.send_ssh_command("cat /sys/class/net/eth0/address")
# if result:
# return result.upper().strip()
async def get_model(self) -> Optional[str]:
if self.model is not None:
return self.model + " (BOS)"
return "? (BOS)"
async def get_version(
self, api_version: dict = None, graphql_version: dict = None
) -> Tuple[Optional[str], Optional[str]]:
miner_version = namedtuple("MinerVersion", "api_ver fw_ver")
api_ver_t = asyncio.create_task(self.get_api_ver(api_version))
fw_ver_t = asyncio.create_task(self.get_fw_ver())
await asyncio.gather(api_ver_t, fw_ver_t)
return miner_version(api_ver=api_ver_t.result(), fw_ver=fw_ver_t.result())
async def get_api_ver(self, api_version: dict = None) -> Optional[str]:
if not api_version:
try:
api_version = await self.api.version()
except APIError:
pass
# Now get the API version
if api_version:
try:
api_ver = api_version["VERSION"][0]["API"]
except (KeyError, IndexError):
api_ver = None
self.api_ver = api_ver
self.api.api_ver = self.api_ver
return self.api_ver
async def get_fw_ver(self) -> Optional[str]:
fw_ver = await self.send_ssh_command("cat /etc/bos_version")
# if we get the version data, parse it
if fw_ver is not None:
ver = fw_ver.split("-")[5]
if "." in ver:
self.fw_ver = ver
logging.debug(f"Found version for {self.ip}: {self.fw_ver}")
return self.fw_ver
async def get_hostname(self) -> Union[str, None]:
try:
hostname = (
await self.send_ssh_command("cat /proc/sys/kernel/hostname")
).strip()
except Exception as e:
logging.error(f"BOSMiner get_hostname failed with error: {e}")
return None
return hostname
async def get_hashrate(self, api_summary: dict = None) -> Optional[float]:
# get hr from API
if not api_summary:
try:
api_summary = await self.api.summary()
except APIError:
pass
if api_summary:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except (KeyError, IndexError, ValueError, TypeError):
pass
async def get_hashboards(
self,
api_temps: dict = None,
api_devdetails: dict = None,
api_devs: dict = None,
):
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
cmds = []
if not api_temps:
cmds.append("temps")
if not api_devdetails:
cmds.append("devdetails")
if not api_devs:
cmds.append("devs")
if len(cmds) > 0:
try:
d = await self.api.multicommand(*cmds)
except APIError:
d = {}
try:
api_temps = d["temps"][0]
except (KeyError, IndexError):
api_temps = None
try:
api_devdetails = d["devdetails"][0]
except (KeyError, IndexError):
api_devdetails = None
try:
api_devs = d["devs"][0]
except (KeyError, IndexError):
api_devs = None
if api_temps:
try:
offset = 6 if api_temps["TEMPS"][0]["ID"] in [6, 7, 8] else 1
for board in api_temps["TEMPS"]:
_id = board["ID"] - offset
chip_temp = round(board["Chip"])
board_temp = round(board["Board"])
hashboards[_id].chip_temp = chip_temp
hashboards[_id].temp = board_temp
except (IndexError, KeyError, ValueError, TypeError):
pass
if api_devdetails:
try:
offset = 6 if api_devdetails["DEVDETAILS"][0]["ID"] in [6, 7, 8] else 1
for board in api_devdetails["DEVDETAILS"]:
_id = board["ID"] - offset
chips = board["Chips"]
hashboards[_id].chips = chips
hashboards[_id].missing = False
except (IndexError, KeyError):
pass
if api_devs:
try:
offset = 6 if api_devs["DEVS"][0]["ID"] in [6, 7, 8] else 1
for board in api_devs["DEVS"]:
_id = board["ID"] - offset
hashrate = round(float(board["MHS 1m"] / 1000000), 2)
hashboards[_id].hashrate = hashrate
except (IndexError, KeyError):
pass
return hashboards
async def get_env_temp(self) -> Optional[float]:
return None
async def get_wattage(self, api_tunerstatus: dict = None) -> Optional[int]:
if not api_tunerstatus:
try:
api_tunerstatus = await self.api.tunerstatus()
except APIError:
pass
if api_tunerstatus:
try:
return api_tunerstatus["TUNERSTATUS"][0][
"ApproximateMinerPowerConsumption"
]
except (KeyError, IndexError):
pass
async def get_wattage_limit(self, api_tunerstatus: dict = None) -> Optional[int]:
if not api_tunerstatus:
try:
api_tunerstatus = await self.api.tunerstatus()
except APIError:
pass
if api_tunerstatus:
try:
return api_tunerstatus["TUNERSTATUS"][0]["PowerLimit"]
except (KeyError, IndexError):
pass
async def get_fans(self, api_fans: dict = None) -> List[Fan]:
if not api_fans:
try:
api_fans = await self.api.fans()
except APIError:
pass
if api_fans:
fans = []
for n in range(self.fan_count):
try:
fans.append(Fan(api_fans["FANS"][n]["RPM"]))
except (IndexError, KeyError):
pass
return fans
return [Fan() for _ in range(self.fan_count)]
async def get_fan_psu(self) -> Optional[int]:
return None
async def get_errors(self, api_tunerstatus: dict = None) -> List[MinerErrorData]:
if not api_tunerstatus:
try:
api_tunerstatus = await self.api.tunerstatus()
except APIError:
pass
if api_tunerstatus:
errors = []
try:
chain_status = api_tunerstatus["TUNERSTATUS"][0]["TunerChainStatus"]
if chain_status and len(chain_status) > 0:
offset = (
6 if int(chain_status[0]["HashchainIndex"]) in [6, 7, 8] else 0
)
for board in chain_status:
_id = board["HashchainIndex"] - offset
if board["Status"] not in [
"Stable",
"Testing performance profile",
"Tuning individual chips",
]:
_error = board["Status"].split(" {")[0]
_error = _error[0].lower() + _error[1:]
errors.append(BraiinsOSError(f"Slot {_id} {_error}"))
return errors
except (KeyError, IndexError):
pass
async def get_fault_light(self, graphql_fault_light: dict = None) -> bool:
if self.light:
return self.light
try:
data = (
await self.send_ssh_command("cat /sys/class/leds/'Red LED'/delay_off")
).strip()
self.light = False
if data == "50":
self.light = True
return self.light
except (TypeError, AttributeError):
return self.light
async def get_expected_hashrate(self, api_devs: dict = None) -> Optional[float]:
if not api_devs:
try:
api_devs = await self.api.devs()
except APIError:
pass
if api_devs:
try:
offset = 6 if api_devs["DEVS"][0]["ID"] in [6, 7, 8] else 0
hr_list = []
for board in api_devs["DEVS"]:
_id = board["ID"] - offset
expected_hashrate = round(float(board["Nominal MHS"] / 1000000), 2)
if expected_hashrate:
hr_list.append(expected_hashrate)
if len(hr_list) == 0:
return 0
else:
return round(
(sum(hr_list) / len(hr_list)) * self.expected_hashboards, 2
)
except (IndexError, KeyError):
pass
async def is_mining(self, api_devdetails: dict = None) -> Optional[bool]:
if not api_devdetails:
try:
api_devdetails = await self.api.send_command(
"devdetails", ignore_errors=True, allow_warning=False
)
except APIError:
pass
if api_devdetails:
try:
return not api_devdetails["STATUS"][0]["Msg"] == "Unavailable"
except LookupError:
pass
async def get_uptime(self, api_summary: dict = None) -> Optional[int]:
if not api_summary:
try:
api_summary = await self.api.summary()
except APIError:
pass
if api_summary:
try:
return int(api_summary["SUMMARY"][0]["Elapsed"])
except LookupError:
pass
BOSER_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"get_mac",
[WebAPICommand("web_net_conf", "admin/network/iface_status/lan")],
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver", [RPCAPICommand("api_version", "version")]
),
@@ -179,18 +781,18 @@ BOSMINER_DATA_LOC = DataLocations(
)
class BOSMiner(BaseMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0", boser: bool = None) -> None:
class BOSer(BaseMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip)
# interfaces
self.api = BOSMinerAPI(ip, api_ver)
self.web = BOSMinerWebAPI(ip, boser=boser)
self.web = BOSerWebAPI(ip)
# static data
self.api_type = "BOSMiner"
self.fw_str = "BOS"
# data gathering locations
self.data_locations = BOSMINER_DATA_LOC
self.data_locations = BOSER_DATA_LOC
# autotuning/shutdown support
self.supports_autotuning = True
self.supports_shutdown = True
@@ -432,16 +1034,14 @@ class BOSMiner(BaseMiner):
if not web_net_conf:
try:
web_net_conf = await self.web.send_command(
"/cgi-bin/luci/admin/network/iface_status/lan"
"admin/network/iface_status/lan"
)
except APIError:
pass
if isinstance(web_net_conf, dict):
if "/cgi-bin/luci/admin/network/iface_status/lan" in web_net_conf.keys():
web_net_conf = web_net_conf[
"/cgi-bin/luci/admin/network/iface_status/lan"
]
if "admin/network/iface_status/lan" in web_net_conf.keys():
web_net_conf = web_net_conf["admin/network/iface_status/lan"]
if web_net_conf:
try:

View File

@@ -84,29 +84,31 @@ class BOSerWebAPI(BOSMinerWebAPI):
**parameters: Union[str, int, bool],
) -> dict:
command_type = self.select_command_type(command)
if command_type == "gql":
if command_type is "gql":
return await self.gql.send_command(command)
elif command_type == "grpc":
elif command_type is "grpc":
try:
return await (getattr(self.grpc, command.replace("grpc_", "")))()
except AttributeError:
raise APIError(f"No gRPC command found for command: {command}")
elif command_type == "luci":
elif command_type is "luci":
return await self.luci.send_command(command)
@staticmethod
def select_command_type(command: Union[str, dict]) -> str:
if isinstance(command, dict):
return "gql"
else:
elif command.startswith("grpc_"):
return "grpc"
else:
return "luci"
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
cmd_types = {"grpc": [], "gql": []}
cmd_types = {"grpc": [], "gql": [], "luci": []}
for cmd in commands:
cmd_types[self.select_command_type(cmd)].append(cmd)
cmd_types[self.select_command_type(cmd)] = cmd
async def no_op():
return {}
@@ -116,13 +118,21 @@ class BOSerWebAPI(BOSMinerWebAPI):
self.grpc.multicommand(*cmd_types["grpc"])
)
else:
grpc_data_t = asyncio.create_task(no_op())
grpc_data_t = no_op()
if len(cmd_types["gql"]) > 0:
gql_data_t = asyncio.create_task(self.gql.multicommand(*cmd_types["gql"]))
else:
gql_data_t = asyncio.create_task(no_op())
gql_data_t = no_op()
if len(cmd_types["luci"]) > 0:
luci_data_t = asyncio.create_task(
self.luci.multicommand(*cmd_types["luci"])
)
else:
luci_data_t = no_op()
await asyncio.gather(grpc_data_t, gql_data_t)
await asyncio.gather(grpc_data_t, gql_data_t, luci_data_t)
data = dict(**grpc_data_t.result(), **gql_data_t.result())
data = dict(
**luci_data_t.result(), **gql_data_t.result(), **luci_data_t.result()
)
return data

View File

@@ -13,12 +13,9 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
import logging
from datetime import timedelta
from betterproto import Message
from grpclib import GRPCError, Status
from grpclib.client import Channel
from pyasic.errors import APIError
@@ -46,7 +43,6 @@ class BOSerGRPCAPI:
self.ip = ip
self.username = "root"
self.pwd = pwd
self.port = 50051
self._auth = None
self._auth_time = datetime.now()
@@ -68,20 +64,7 @@ class BOSerGRPCAPI:
]
async def multicommand(self, *commands: str) -> dict:
result = {"multicommand": True}
tasks = {}
for command in commands:
try:
tasks[command] = asyncio.create_task(getattr(self, command)())
except AttributeError:
result["command"] = {}
await asyncio.gather(*list(tasks.values()))
for cmd in tasks:
result[cmd] = tasks[cmd].result()
return result
pass
async def send_command(
self,
@@ -93,23 +76,13 @@ class BOSerGRPCAPI:
metadata = []
if auth:
metadata.append(("authorization", await self.auth()))
try:
async with Channel(self.ip, self.port) as c:
endpoint = getattr(BOSMinerGRPCStub(c), command)
if endpoint is None:
if not ignore_errors:
raise APIError(f"Command not found - {endpoint}")
return {}
try:
return (await endpoint(message, metadata=metadata)).to_pydict()
except GRPCError as e:
if e.status == Status.UNAUTHENTICATED:
await self._get_auth()
metadata = [("authorization", await self.auth())]
return (await endpoint(message, metadata=metadata)).to_pydict()
raise e
except GRPCError as e:
raise APIError(f"gRPC command failed - {endpoint}") from e
async with Channel(self.ip, 50051) as c:
endpoint = getattr(BOSMinerGRPCStub(c), command)
if endpoint is None:
if not ignore_errors:
raise APIError(f"Command not found - {endpoint}")
return {}
return (await endpoint(message, metadata=metadata)).to_pydict()
async def auth(self):
if self._auth is not None and self._auth_time - datetime.now() < timedelta(
@@ -120,7 +93,7 @@ class BOSerGRPCAPI:
return self._auth
async def _get_auth(self):
async with Channel(self.ip, self.port) as c:
async with Channel(self.ip, 50051) as c:
req = LoginRequest(username=self.username, password=self.pwd)
async with c.request(
"/braiins.bos.v1.AuthenticationService/Login",
@@ -165,9 +138,7 @@ class BOSerGRPCAPI:
)
async def get_locate_device_status(self):
return await self.send_command(
"get_locate_device_status", GetLocateDeviceStatusRequest()
)
return await self.send_command("get_locate_device_status")
async def set_password(self, password: str = None):
return await self.send_command(
@@ -190,12 +161,10 @@ class BOSerGRPCAPI:
)
async def get_tuner_state(self):
return await self.send_command("get_tuner_state", GetTunerStateRequest())
return await self.send_command("get_tuner_state")
async def list_target_profiles(self):
return await self.send_command(
"list_target_profiles", ListTargetProfilesRequest()
)
return await self.send_command("list_target_profiles")
async def set_default_power_target(
self, save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY
@@ -296,71 +265,15 @@ class BOSerGRPCAPI:
async def set_dps(
self,
enable: bool,
power_step: int,
min_power_target: int,
enable_shutdown: bool = None,
shutdown_duration: int = None,
):
return await self.send_command(
"set_dps",
message=SetDpsRequest(
enable=enable,
enable_shutdown=enable_shutdown,
shutdown_duration=shutdown_duration,
target=DpsTarget(
power_target=DpsPowerTarget(
power_step=Power(power_step),
min_power_target=Power(min_power_target),
)
),
),
)
raise NotImplementedError
return await self.send_command("braiins.bos.v1.PerformanceService/SetDPS")
async def set_performance_mode(
self,
wattage_target: int = None,
hashrate_target: int = None,
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
if wattage_target is not None and hashrate_target is not None:
logging.error(
"Cannot use both wattage_target and hashrate_target, using wattage_target."
)
elif wattage_target is None and hashrate_target is None:
raise APIError(
"No target supplied, please supply either wattage_target or hashrate_target."
)
if wattage_target is not None:
return await self.send_command(
"set_performance_mode",
message=SetPerformanceModeRequest(
save_action=save_action,
mode=PerformanceMode(
tuner_mode=TunerPerformanceMode(
power_target=PowerTargetMode(
power_target=Power(watt=wattage_target)
)
)
),
),
)
if hashrate_target is not None:
return await self.send_command(
"set_performance_mode",
message=SetPerformanceModeRequest(
save_action=save_action,
mode=PerformanceMode(
tuner_mode=TunerPerformanceMode(
hashrate_target=HashrateTargetMode(
hashrate_target=TeraHashrate(
terahash_per_second=hashrate_target
)
)
)
),
),
)
async def set_performance_mode(self):
raise NotImplementedError
return await self.send_command(
"braiins.bos.v1.PerformanceService/SetPerformanceMode"
)
async def get_active_performance_mode(self):
return await self.send_command(