feature: start refactoring BOSer and BOSMiner into separate classes.

This commit is contained in:
b-rowan
2024-01-10 22:12:27 -07:00
parent 9da7b44177
commit bea44a72ea
18 changed files with 968 additions and 436 deletions

View File

@@ -17,7 +17,7 @@ from dataclasses import dataclass, field
from typing import Union
from pyasic.config.base import MinerConfigOption, MinerConfigValue
from pyasic.web.bosminer.proto.braiins.bos.v1 import DpsPowerTarget, DpsTarget, Hours
from pyasic.web.braiins_os.proto.braiins.bos.v1 import DpsPowerTarget, DpsTarget, Hours
@dataclass

View File

@@ -14,21 +14,21 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners.backends import BOSMiner
from pyasic.miners.backends import BOSer
from pyasic.miners.types import S17, S17e, S17Plus, S17Pro
class BOSMinerS17(BOSMiner, S17):
class BOSMinerS17(BOSer, S17):
pass
class BOSMinerS17Plus(BOSMiner, S17Plus):
class BOSMinerS17Plus(BOSer, S17Plus):
pass
class BOSMinerS17Pro(BOSMiner, S17Pro):
class BOSMinerS17Pro(BOSer, S17Pro):
pass
class BOSMinerS17e(BOSMiner, S17e):
class BOSMinerS17e(BOSer, S17e):
pass

View File

@@ -14,17 +14,17 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners.backends import BOSMiner
from pyasic.miners.backends import BOSer
from pyasic.miners.types import T17, T17e, T17Plus
class BOSMinerT17(BOSMiner, T17):
class BOSMinerT17(BOSer, T17):
pass
class BOSMinerT17Plus(BOSMiner, T17Plus):
class BOSMinerT17Plus(BOSer, T17Plus):
pass
class BOSMinerT17e(BOSMiner, T17e):
class BOSMinerT17e(BOSer, T17e):
pass

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners.backends import BOSMiner
from pyasic.miners.backends import BOSer
from pyasic.miners.types import (
S19,
S19XP,
@@ -30,45 +30,45 @@ from pyasic.miners.types import (
)
class BOSMinerS19(BOSMiner, S19):
class BOSMinerS19(BOSer, S19):
pass
class BOSMinerS19Plus(BOSMiner, S19Plus):
class BOSMinerS19Plus(BOSer, S19Plus):
pass
class BOSMinerS19Pro(BOSMiner, S19Pro):
class BOSMinerS19Pro(BOSer, S19Pro):
pass
class BOSMinerS19a(BOSMiner, S19a):
class BOSMinerS19a(BOSer, S19a):
pass
class BOSMinerS19j(BOSMiner, S19j):
class BOSMinerS19j(BOSer, S19j):
pass
class BOSMinerS19jNoPIC(BOSMiner, S19jNoPIC):
class BOSMinerS19jNoPIC(BOSer, S19jNoPIC):
pass
class BOSMinerS19jPro(BOSMiner, S19jPro):
class BOSMinerS19jPro(BOSer, S19jPro):
pass
class BOSMinerS19kProNoPIC(BOSMiner, S19kProNoPIC):
class BOSMinerS19kProNoPIC(BOSer, S19kProNoPIC):
pass
class BOSMinerS19aPro(BOSMiner, S19aPro):
class BOSMinerS19aPro(BOSer, S19aPro):
pass
class BOSMinerS19jProPlus(BOSMiner, S19jProPlus):
class BOSMinerS19jProPlus(BOSer, S19jProPlus):
pass
class BOSMinerS19XP(BOSMiner, S19XP):
class BOSMinerS19XP(BOSer, S19XP):
pass

View File

@@ -14,9 +14,9 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners.backends import BOSMiner
from pyasic.miners.backends import BOSer
from pyasic.miners.types import T19
class BOSMinerT19(BOSMiner, T19):
class BOSMinerT19(BOSer, T19):
pass

View File

@@ -17,7 +17,7 @@ from .antminer import AntminerModern, AntminerOld
from .bfgminer import BFGMiner
from .bfgminer_goldshell import BFGMinerGoldshell
from .bmminer import BMMiner
from .bosminer import BOSMiner
from .braiins_os import BOSer, BOSMiner
from .btminer import BTMiner
from .cgminer import CGMiner
from .cgminer_avalon import CGMinerAvalon

View File

@@ -1,155 +0,0 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import logging
from typing import List, Optional, Tuple
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard, MinerData
from pyasic.data.error_codes import MinerErrorData
from pyasic.miners.backends import BOSMiner
class BOSMinerOld(BOSMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver)
async def send_ssh_command(self, cmd: str) -> Optional[str]:
result = None
try:
conn = await self._get_ssh_connection()
except ConnectionError:
return None
# open an ssh connection
async with conn:
# 3 retries
for i in range(3):
try:
# run the command and get the result
result = await conn.run(cmd)
result = result.stdout
except Exception as e:
# if the command fails, log it
logging.warning(f"{self} command {cmd} error: {e}")
# on the 3rd retry, return None
if i == 3:
return
continue
# return the result, either command output or None
return result
async def update_to_plus(self):
result = await self.send_ssh_command("opkg update && opkg install bos_plus")
return result
async def check_light(self) -> bool:
return False
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self) -> None:
return None
async def reboot(self) -> bool:
return False
async def restart_backend(self) -> bool:
return False
async def stop_mining(self) -> bool:
return False
async def resume_mining(self) -> bool:
return False
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
return None
async def set_power_limit(self, wattage: int) -> bool:
return False
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(self, *args, **kwargs) -> Optional[str]:
return None
async def get_model(self, *args, **kwargs) -> str:
return "S9"
async def get_version(self, *args, **kwargs) -> Tuple[Optional[str], Optional[str]]:
return None, None
async def get_hostname(self, *args, **kwargs) -> Optional[str]:
return None
async def get_hashrate(self, *args, **kwargs) -> Optional[float]:
return None
async def get_hashboards(self, *args, **kwargs) -> List[HashBoard]:
return []
async def get_env_temp(self, *args, **kwargs) -> Optional[float]:
return None
async def get_wattage(self, *args, **kwargs) -> Optional[int]:
return None
async def get_wattage_limit(self, *args, **kwargs) -> Optional[int]:
return None
async def get_fans(
self,
*args,
**kwargs,
) -> List[Fan]:
return [Fan(), Fan(), Fan(), Fan()]
async def get_fan_psu(self, *args, **kwargs) -> Optional[int]:
return None
async def get_api_ver(self, *args, **kwargs) -> Optional[str]:
return None
async def get_fw_ver(self, *args, **kwargs) -> Optional[str]:
return None
async def get_errors(self, *args, **kwargs) -> List[MinerErrorData]:
return []
async def get_fault_light(self, *args, **kwargs) -> bool:
return False
async def get_expected_hashrate(self, *args, **kwargs) -> Optional[float]:
return None
async def get_data(self, allow_warning: bool = False, **kwargs) -> MinerData:
return MinerData(ip=str(self.ip))
async def is_mining(self, *args, **kwargs) -> Optional[bool]:
return None
async def get_uptime(self, *args, **kwargs) -> Optional[int]:
return None

View File

@@ -36,18 +36,620 @@ from pyasic.miners.base import (
RPCAPICommand,
WebAPICommand,
)
from pyasic.web.bosminer import BOSMinerWebAPI
from pyasic.web.braiins_os import BOSerWebAPI, BOSMinerWebAPI
BOSMINER_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"get_mac",
[WebAPICommand("web_net_conf", "admin/network/iface_status/lan")],
),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"get_api_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.FW_VERSION): DataFunction("get_fw_ver"),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"get_hashrate",
[RPCAPICommand("api_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"get_expected_hashrate", [RPCAPICommand("api_devs", "devs")]
),
str(DataOptions.HASHBOARDS): DataFunction(
"get_hashboards",
[
WebAPICommand(
"web_net_conf", "/cgi-bin/luci/admin/network/iface_status/lan"
)
RPCAPICommand("api_temps", "temps"),
RPCAPICommand("api_devdetails", "devdetails"),
RPCAPICommand("api_devs", "devs"),
],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction("get_env_temp"),
str(DataOptions.WATTAGE): DataFunction(
"get_wattage",
[RPCAPICommand("api_tunerstatus", "tunerstatus")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"get_wattage_limit",
[RPCAPICommand("api_tunerstatus", "tunerstatus")],
),
str(DataOptions.FANS): DataFunction(
"get_fans",
[RPCAPICommand("api_fans", "fans")],
),
str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction(
"get_errors",
[RPCAPICommand("api_tunerstatus", "tunerstatus")],
),
str(DataOptions.FAULT_LIGHT): DataFunction("get_fault_light"),
str(DataOptions.IS_MINING): DataFunction(
"is_mining", [RPCAPICommand("api_devdetails", "devdetails")]
),
str(DataOptions.UPTIME): DataFunction(
"get_uptime", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
)
class BOSMiner(BaseMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip)
# interfaces
self.api = BOSMinerAPI(ip, api_ver)
self.web = BOSMinerWebAPI(ip)
# static data
self.api_type = "BOSMiner"
# data gathering locations
self.data_locations = BOSMINER_DATA_LOC
# autotuning/shutdown support
self.supports_autotuning = True
self.supports_shutdown = True
# data storage
self.api_ver = api_ver
async def send_ssh_command(self, cmd: str) -> Optional[str]:
result = None
try:
conn = await asyncio.wait_for(self._get_ssh_connection(), timeout=10)
except (ConnectionError, asyncio.TimeoutError):
return None
# open an ssh connection
async with conn:
# 3 retries
for i in range(3):
try:
# run the command and get the result
result = await conn.run(cmd)
stderr = result.stderr
result = result.stdout
if len(stderr) > len(result):
result = stderr
except Exception as e:
# if the command fails, log it
logging.warning(f"{self} command {cmd} error: {e}")
# on the 3rd retry, return None
if i == 3:
return
continue
# return the result, either command output or None
return result
async def fault_light_on(self) -> bool:
logging.debug(f"{self}: Sending fault_light on command.")
ret = await self.send_ssh_command("miner fault_light on")
logging.debug(f"{self}: fault_light on command completed.")
if isinstance(ret, str):
self.light = True
return self.light
return False
async def fault_light_off(self) -> bool:
logging.debug(f"{self}: Sending fault_light off command.")
self.light = False
ret = await self.send_ssh_command("miner fault_light off")
logging.debug(f"{self}: fault_light off command completed.")
if isinstance(ret, str):
self.light = False
return True
return False
async def restart_backend(self) -> bool:
return await self.restart_bosminer()
async def restart_bosminer(self) -> bool:
logging.debug(f"{self}: Sending bosminer restart command.")
ret = await self.send_ssh_command("/etc/init.d/bosminer restart")
logging.debug(f"{self}: bosminer restart command completed.")
if isinstance(ret, str):
return True
return False
async def stop_mining(self) -> bool:
try:
data = await self.api.pause()
except APIError:
return False
if data.get("PAUSE"):
if data["PAUSE"][0]:
return True
return False
async def resume_mining(self) -> bool:
try:
data = await self.api.resume()
except APIError:
return False
if data.get("RESUME"):
if data["RESUME"][0]:
return True
return False
async def reboot(self) -> bool:
logging.debug(f"{self}: Sending reboot command.")
ret = await self.send_ssh_command("/sbin/reboot")
logging.debug(f"{self}: Reboot command completed.")
if isinstance(ret, str):
return True
return False
async def get_config(self) -> MinerConfig:
logging.debug(f"{self}: Getting config.")
try:
conn = await self._get_ssh_connection()
except ConnectionError:
conn = None
if conn:
async with conn:
# good ol' BBB compatibility :/
toml_data = toml.loads(
(await conn.run("cat /etc/bosminer.toml")).stdout
)
logging.debug(f"{self}: Converting config file.")
cfg = MinerConfig.from_bosminer(toml_data)
self.config = cfg
return self.config
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
logging.debug(f"{self}: Sending config.")
self.config = config
toml_conf = toml.dumps(
{
"format": {
"version": "1.2+",
"generator": "pyasic",
"model": f"{self.make.replace('Miner', 'miner')} {self.model.replace(' (BOS)', '').replace('j', 'J')}",
"timestamp": int(time.time()),
},
**config.as_bosminer(user_suffix=user_suffix),
}
)
try:
conn = await self._get_ssh_connection()
except ConnectionError as e:
raise APIError("SSH connection failed when sending config.") from e
async with conn:
# BBB check because bitmain suxx
bbb_check = await conn.run(
"if [ ! -f /etc/init.d/bosminer ]; then echo '1'; else echo '0'; fi;"
)
bbb = bbb_check.stdout.strip() == "1"
if not bbb:
await conn.run("/etc/init.d/bosminer stop")
logging.debug(f"{self}: Opening SFTP connection.")
async with conn.start_sftp_client() as sftp:
logging.debug(f"{self}: Opening config file.")
async with sftp.open("/etc/bosminer.toml", "w+") as file:
await file.write(toml_conf)
logging.debug(f"{self}: Restarting BOSMiner")
await conn.run("/etc/init.d/bosminer start")
# I really hate BBB, please get rid of it if you have it
else:
await conn.run("/etc/init.d/S99bosminer stop")
logging.debug(f"{self}: BBB sending config")
await conn.run("echo '" + toml_conf + "' > /etc/bosminer.toml")
logging.debug(f"{self}: BBB restarting bosminer.")
await conn.run("/etc/init.d/S99bosminer start")
async def set_power_limit(self, wattage: int) -> bool:
try:
cfg = await self.get_config()
if cfg is None:
return False
cfg.mining_mode = MiningModePowerTune(wattage)
await self.send_config(cfg)
except Exception as e:
logging.warning(f"{self} set_power_limit: {e}")
return False
else:
return True
async def set_static_ip(
self,
ip: str,
dns: str,
gateway: str,
subnet_mask: str = "255.255.255.0",
):
cfg_data_lan = (
"config interface 'lan'\n\toption type 'bridge'\n\toption ifname 'eth0'\n\toption proto 'static'\n\toption ipaddr '"
+ ip
+ "'\n\toption netmask '"
+ subnet_mask
+ "'\n\toption gateway '"
+ gateway
+ "'\n\toption dns '"
+ dns
+ "'"
)
data = await self.send_ssh_command("cat /etc/config/network")
split_data = data.split("\n\n")
for idx in range(len(split_data)):
if "config interface 'lan'" in split_data[idx]:
split_data[idx] = cfg_data_lan
config = "\n\n".join(split_data)
conn = await self._get_ssh_connection()
async with conn:
await conn.run("echo '" + config + "' > /etc/config/network")
async def set_dhcp(self):
cfg_data_lan = "config interface 'lan'\n\toption type 'bridge'\n\toption ifname 'eth0'\n\toption proto 'dhcp'"
data = await self.send_ssh_command("cat /etc/config/network")
split_data = data.split("\n\n")
for idx in range(len(split_data)):
if "config interface 'lan'" in split_data[idx]:
split_data[idx] = cfg_data_lan
config = "\n\n".join(split_data)
conn = await self._get_ssh_connection()
async with conn:
await conn.run("echo '" + config + "' > /etc/config/network")
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(self, web_net_conf: Union[dict, list] = None) -> Optional[str]:
if not web_net_conf:
try:
web_net_conf = await self.web.luci.send_command(
"admin/network/iface_status/lan"
)
except APIError:
pass
if isinstance(web_net_conf, dict):
if "admin/network/iface_status/lan" in web_net_conf.keys():
web_net_conf = web_net_conf["admin/network/iface_status/lan"]
if web_net_conf:
try:
return web_net_conf[0]["macaddr"]
except LookupError:
pass
# could use ssh, but its slow and buggy
# result = await self.send_ssh_command("cat /sys/class/net/eth0/address")
# if result:
# return result.upper().strip()
async def get_model(self) -> Optional[str]:
if self.model is not None:
return self.model + " (BOS)"
return "? (BOS)"
async def get_version(
self, api_version: dict = None, graphql_version: dict = None
) -> Tuple[Optional[str], Optional[str]]:
miner_version = namedtuple("MinerVersion", "api_ver fw_ver")
api_ver_t = asyncio.create_task(self.get_api_ver(api_version))
fw_ver_t = asyncio.create_task(self.get_fw_ver())
await asyncio.gather(api_ver_t, fw_ver_t)
return miner_version(api_ver=api_ver_t.result(), fw_ver=fw_ver_t.result())
async def get_api_ver(self, api_version: dict = None) -> Optional[str]:
if not api_version:
try:
api_version = await self.api.version()
except APIError:
pass
# Now get the API version
if api_version:
try:
api_ver = api_version["VERSION"][0]["API"]
except (KeyError, IndexError):
api_ver = None
self.api_ver = api_ver
self.api.api_ver = self.api_ver
return self.api_ver
async def get_fw_ver(self) -> Optional[str]:
fw_ver = await self.send_ssh_command("cat /etc/bos_version")
# if we get the version data, parse it
if fw_ver is not None:
ver = fw_ver.split("-")[5]
if "." in ver:
self.fw_ver = ver
logging.debug(f"Found version for {self.ip}: {self.fw_ver}")
return self.fw_ver
async def get_hostname(self) -> Union[str, None]:
try:
hostname = (
await self.send_ssh_command("cat /proc/sys/kernel/hostname")
).strip()
except Exception as e:
logging.error(f"BOSMiner get_hostname failed with error: {e}")
return None
return hostname
async def get_hashrate(self, api_summary: dict = None) -> Optional[float]:
# get hr from API
if not api_summary:
try:
api_summary = await self.api.summary()
except APIError:
pass
if api_summary:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except (KeyError, IndexError, ValueError, TypeError):
pass
async def get_hashboards(
self,
api_temps: dict = None,
api_devdetails: dict = None,
api_devs: dict = None,
):
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
cmds = []
if not api_temps:
cmds.append("temps")
if not api_devdetails:
cmds.append("devdetails")
if not api_devs:
cmds.append("devs")
if len(cmds) > 0:
try:
d = await self.api.multicommand(*cmds)
except APIError:
d = {}
try:
api_temps = d["temps"][0]
except (KeyError, IndexError):
api_temps = None
try:
api_devdetails = d["devdetails"][0]
except (KeyError, IndexError):
api_devdetails = None
try:
api_devs = d["devs"][0]
except (KeyError, IndexError):
api_devs = None
if api_temps:
try:
offset = 6 if api_temps["TEMPS"][0]["ID"] in [6, 7, 8] else 1
for board in api_temps["TEMPS"]:
_id = board["ID"] - offset
chip_temp = round(board["Chip"])
board_temp = round(board["Board"])
hashboards[_id].chip_temp = chip_temp
hashboards[_id].temp = board_temp
except (IndexError, KeyError, ValueError, TypeError):
pass
if api_devdetails:
try:
offset = 6 if api_devdetails["DEVDETAILS"][0]["ID"] in [6, 7, 8] else 1
for board in api_devdetails["DEVDETAILS"]:
_id = board["ID"] - offset
chips = board["Chips"]
hashboards[_id].chips = chips
hashboards[_id].missing = False
except (IndexError, KeyError):
pass
if api_devs:
try:
offset = 6 if api_devs["DEVS"][0]["ID"] in [6, 7, 8] else 1
for board in api_devs["DEVS"]:
_id = board["ID"] - offset
hashrate = round(float(board["MHS 1m"] / 1000000), 2)
hashboards[_id].hashrate = hashrate
except (IndexError, KeyError):
pass
return hashboards
async def get_env_temp(self) -> Optional[float]:
return None
async def get_wattage(self, api_tunerstatus: dict = None) -> Optional[int]:
if not api_tunerstatus:
try:
api_tunerstatus = await self.api.tunerstatus()
except APIError:
pass
if api_tunerstatus:
try:
return api_tunerstatus["TUNERSTATUS"][0][
"ApproximateMinerPowerConsumption"
]
except (KeyError, IndexError):
pass
async def get_wattage_limit(self, api_tunerstatus: dict = None) -> Optional[int]:
if not api_tunerstatus:
try:
api_tunerstatus = await self.api.tunerstatus()
except APIError:
pass
if api_tunerstatus:
try:
return api_tunerstatus["TUNERSTATUS"][0]["PowerLimit"]
except (KeyError, IndexError):
pass
async def get_fans(self, api_fans: dict = None) -> List[Fan]:
if not api_fans:
try:
api_fans = await self.api.fans()
except APIError:
pass
if api_fans:
fans = []
for n in range(self.fan_count):
try:
fans.append(Fan(api_fans["FANS"][n]["RPM"]))
except (IndexError, KeyError):
pass
return fans
return [Fan() for _ in range(self.fan_count)]
async def get_fan_psu(self) -> Optional[int]:
return None
async def get_errors(self, api_tunerstatus: dict = None) -> List[MinerErrorData]:
if not api_tunerstatus:
try:
api_tunerstatus = await self.api.tunerstatus()
except APIError:
pass
if api_tunerstatus:
errors = []
try:
chain_status = api_tunerstatus["TUNERSTATUS"][0]["TunerChainStatus"]
if chain_status and len(chain_status) > 0:
offset = (
6 if int(chain_status[0]["HashchainIndex"]) in [6, 7, 8] else 0
)
for board in chain_status:
_id = board["HashchainIndex"] - offset
if board["Status"] not in [
"Stable",
"Testing performance profile",
"Tuning individual chips",
]:
_error = board["Status"].split(" {")[0]
_error = _error[0].lower() + _error[1:]
errors.append(BraiinsOSError(f"Slot {_id} {_error}"))
return errors
except (KeyError, IndexError):
pass
async def get_fault_light(self, graphql_fault_light: dict = None) -> bool:
if self.light:
return self.light
try:
data = (
await self.send_ssh_command("cat /sys/class/leds/'Red LED'/delay_off")
).strip()
self.light = False
if data == "50":
self.light = True
return self.light
except (TypeError, AttributeError):
return self.light
async def get_expected_hashrate(self, api_devs: dict = None) -> Optional[float]:
if not api_devs:
try:
api_devs = await self.api.devs()
except APIError:
pass
if api_devs:
try:
offset = 6 if api_devs["DEVS"][0]["ID"] in [6, 7, 8] else 0
hr_list = []
for board in api_devs["DEVS"]:
_id = board["ID"] - offset
expected_hashrate = round(float(board["Nominal MHS"] / 1000000), 2)
if expected_hashrate:
hr_list.append(expected_hashrate)
if len(hr_list) == 0:
return 0
else:
return round(
(sum(hr_list) / len(hr_list)) * self.expected_hashboards, 2
)
except (IndexError, KeyError):
pass
async def is_mining(self, api_devdetails: dict = None) -> Optional[bool]:
if not api_devdetails:
try:
api_devdetails = await self.api.send_command(
"devdetails", ignore_errors=True, allow_warning=False
)
except APIError:
pass
if api_devdetails:
try:
return not api_devdetails["STATUS"][0]["Msg"] == "Unavailable"
except LookupError:
pass
async def get_uptime(self, api_summary: dict = None) -> Optional[int]:
if not api_summary:
try:
api_summary = await self.api.summary()
except APIError:
pass
if api_summary:
try:
return int(api_summary["SUMMARY"][0]["Elapsed"])
except LookupError:
pass
BOSER_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"get_mac",
[WebAPICommand("web_net_conf", "admin/network/iface_status/lan")],
),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"get_api_ver", [RPCAPICommand("api_version", "version")]
@@ -180,17 +782,17 @@ BOSMINER_DATA_LOC = DataLocations(
)
class BOSMiner(BaseMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0", boser: bool = None) -> None:
class BOSer(BaseMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip)
# interfaces
self.api = BOSMinerAPI(ip, api_ver)
self.web = BOSMinerWebAPI(ip, boser=boser)
self.web = BOSerWebAPI(ip)
# static data
self.api_type = "BOSMiner"
# data gathering locations
self.data_locations = BOSMINER_DATA_LOC
self.data_locations = BOSER_DATA_LOC
# autotuning/shutdown support
self.supports_autotuning = True
self.supports_shutdown = True
@@ -432,16 +1034,14 @@ class BOSMiner(BaseMiner):
if not web_net_conf:
try:
web_net_conf = await self.web.send_command(
"/cgi-bin/luci/admin/network/iface_status/lan"
"admin/network/iface_status/lan"
)
except APIError:
pass
if isinstance(web_net_conf, dict):
if "/cgi-bin/luci/admin/network/iface_status/lan" in web_net_conf.keys():
web_net_conf = web_net_conf[
"/cgi-bin/luci/admin/network/iface_status/lan"
]
if "admin/network/iface_status/lan" in web_net_conf.keys():
web_net_conf = web_net_conf["admin/network/iface_status/lan"]
if web_net_conf:
try:

View File

@@ -34,7 +34,7 @@ from pyasic.miners.base import (
RPCAPICommand,
WebAPICommand,
)
from pyasic.web.bosminer import BOSMinerWebAPI
from pyasic.web.braiins_os import BOSMinerWebAPI
LUXMINER_DATA_LOC = DataLocations(
**{

View File

@@ -0,0 +1,138 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
from typing import Union
from pyasic import settings
from pyasic.errors import APIError
from pyasic.web import BaseWebAPI
from .graphql import BOSerGraphQLAPI
from .grpc import BOSerGRPCAPI
from .luci import BOSMinerLuCIAPI
class BOSMinerWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
self.luci = BOSMinerLuCIAPI(
ip, settings.get("default_bosminer_password", "root")
)
self._pwd = settings.get("default_bosminer_password", "root")
super().__init__(ip)
@property
def pwd(self):
return self._pwd
@pwd.setter
def pwd(self, other: str):
self._pwd = other
self.luci.pwd = other
async def send_command(
self,
command: Union[str, dict],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
return await self.luci.send_command(command)
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
return await self.luci.multicommand(*commands)
class BOSerWebAPI(BOSMinerWebAPI):
def __init__(self, ip: str) -> None:
self.gql = BOSerGraphQLAPI(
ip, settings.get("default_bosminer_password", "root")
)
self.grpc = BOSerGRPCAPI(ip, settings.get("default_bosminer_password", "root"))
super().__init__(ip)
@property
def pwd(self):
return self._pwd
@pwd.setter
def pwd(self, other: str):
self._pwd = other
self.luci.pwd = other
self.gql.pwd = other
self.grpc.pwd = other
async def send_command(
self,
command: Union[str, dict],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
command_type = self.select_command_type(command)
if command_type is "gql":
return await self.gql.send_command(command)
elif command_type is "grpc":
try:
return await (getattr(self.grpc, command.replace("grpc_", "")))()
except AttributeError:
raise APIError(f"No gRPC command found for command: {command}")
elif command_type is "luci":
return await self.luci.send_command(command)
@staticmethod
def select_command_type(command: Union[str, dict]) -> str:
if isinstance(command, dict):
return "gql"
elif command.startswith("grpc_"):
return "grpc"
else:
return "luci"
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
cmd_types = {"grpc": [], "gql": [], "luci": []}
for cmd in commands:
cmd_types[self.select_command_type(cmd)] = cmd
async def no_op():
return {}
if len(cmd_types["grpc"]) > 0:
grpc_data_t = asyncio.create_task(
self.grpc.multicommand(*cmd_types["grpc"])
)
else:
grpc_data_t = no_op()
if len(cmd_types["gql"]) > 0:
gql_data_t = asyncio.create_task(self.gql.multicommand(*cmd_types["gql"]))
else:
gql_data_t = no_op()
if len(cmd_types["luci"]) > 0:
luci_data_t = asyncio.create_task(
self.luci.multicommand(*cmd_types["luci"])
)
else:
luci_data_t = no_op()
await asyncio.gather(grpc_data_t, gql_data_t, luci_data_t)
data = dict(
**luci_data_t.result(), **gql_data_t.result(), **luci_data_t.result()
)
return data

View File

@@ -0,0 +1,104 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
from typing import Union
import httpx
from pyasic import settings
class BOSerGraphQLAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
async def multicommand(self, *commands: dict) -> dict:
def merge(*d: dict):
ret = {}
for i in d:
if i:
for k in i:
if not k in ret:
ret[k] = i[k]
else:
ret[k] = merge(ret[k], i[k])
return None if ret == {} else ret
command = merge(*commands)
data = await self.send_command(command)
if data is not None:
if data.get("data") is None:
try:
commands = list(commands)
# noinspection PyTypeChecker
commands.remove({"bos": {"faultLight": None}})
command = merge(*commands)
data = await self.send_command(command)
except (LookupError, ValueError):
pass
if not data:
data = {}
data["multicommand"] = False
return data
async def send_command(
self,
command: dict,
) -> dict:
url = f"http://{self.ip}/graphql"
query = command
if command is None:
return {}
if command.get("query") is None:
query = {"query": self.parse_command(command)}
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
await self.auth(client)
data = await client.post(url, json=query)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
def parse_command(self, graphql_command: Union[dict, set]) -> str:
if isinstance(graphql_command, dict):
data = []
for key in graphql_command:
if graphql_command[key] is not None:
parsed = self.parse_command(graphql_command[key])
data.append(key + parsed)
else:
data.append(key)
else:
data = graphql_command
return "{" + ",".join(data) + "}"
async def auth(self, client: httpx.AsyncClient) -> None:
url = f"http://{self.ip}/graphql"
await client.post(
url,
json={
"query": (
f'mutation{{auth{{login(username:"{self.username}", password:"{self.pwd}"){{__typename}}}}}}'
)
},
)

View File

@@ -13,258 +13,17 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
from datetime import timedelta
from typing import Union
import httpx
from betterproto import Message
from grpclib.client import Channel
from pyasic import settings
from pyasic.errors import APIError
from pyasic.web import BaseWebAPI
from .proto.braiins.bos import *
from .proto.braiins.bos.v1 import *
class BOSMinerWebAPI(BaseWebAPI):
def __init__(self, ip: str, boser: bool = None) -> None:
if boser is None:
boser = True
if boser:
self.gql = BOSMinerGQLAPI(
ip, settings.get("default_bosminer_password", "root")
)
self.grpc = BOSMinerGRPCAPI(
ip, settings.get("default_bosminer_password", "root")
)
else:
self.gql = None
self.grpc = None
self.luci = BOSMinerLuCIAPI(
ip, settings.get("default_bosminer_password", "root")
)
self._pwd = settings.get("default_bosminer_password", "root")
super().__init__(ip)
@property
def pwd(self):
return self._pwd
@pwd.setter
def pwd(self, other: str):
self._pwd = other
self.luci.pwd = other
if self.gql is not None:
self.gql.pwd = other
if self.grpc is not None:
self.grpc.pwd = other
async def send_command(
self,
command: Union[str, dict],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
if isinstance(command, dict):
if self.gql is not None:
return await self.gql.send_command(command)
elif command.startswith("/cgi-bin/luci"):
return await self.gql.send_command(command)
else:
if self.grpc is not None:
return await self.grpc.send_command(command)
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
luci_commands = []
gql_commands = []
grpc_commands = []
for cmd in commands:
if isinstance(cmd, dict):
gql_commands.append(cmd)
elif cmd.startswith("/cgi-bin/luci"):
luci_commands.append(cmd)
else:
grpc_commands.append(cmd)
luci_data = await self.luci.multicommand(*luci_commands)
if self.gql is not None:
gql_data = await self.gql.multicommand(*gql_commands)
else:
gql_data = None
if self.grpc is not None:
grpc_data = await self.grpc.multicommand(*grpc_commands)
else:
grpc_data = None
if gql_data is None:
gql_data = {}
if luci_data is None:
luci_data = {}
if grpc_data is None:
grpc_data = {}
data = dict(**luci_data, **gql_data, **grpc_data)
return data
class BOSMinerGQLAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
async def multicommand(self, *commands: dict) -> dict:
def merge(*d: dict):
ret = {}
for i in d:
if i:
for k in i:
if not k in ret:
ret[k] = i[k]
else:
ret[k] = merge(ret[k], i[k])
return None if ret == {} else ret
command = merge(*commands)
data = await self.send_command(command)
if data is not None:
if data.get("data") is None:
try:
commands = list(commands)
# noinspection PyTypeChecker
commands.remove({"bos": {"faultLight": None}})
command = merge(*commands)
data = await self.send_command(command)
except (LookupError, ValueError):
pass
if not data:
data = {}
data["multicommand"] = False
return data
async def send_command(
self,
command: dict,
) -> dict:
url = f"http://{self.ip}/graphql"
query = command
if command is None:
return {}
if command.get("query") is None:
query = {"query": self.parse_command(command)}
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
await self.auth(client)
data = await client.post(url, json=query)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
def parse_command(self, graphql_command: Union[dict, set]) -> str:
if isinstance(graphql_command, dict):
data = []
for key in graphql_command:
if graphql_command[key] is not None:
parsed = self.parse_command(graphql_command[key])
data.append(key + parsed)
else:
data.append(key)
else:
data = graphql_command
return "{" + ",".join(data) + "}"
async def auth(self, client: httpx.AsyncClient) -> None:
url = f"http://{self.ip}/graphql"
await client.post(
url,
json={
"query": (
'mutation{auth{login(username:"'
+ "root"
+ '", password:"'
+ self.pwd
+ '"){__typename}}}'
)
},
)
class BOSMinerLuCIAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
async def multicommand(self, *commands: str) -> dict:
data = {}
for command in commands:
data[command] = await self.send_command(command, ignore_errors=True)
return data
async def send_command(self, path: str, ignore_errors: bool = False) -> dict:
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
await self.auth(client)
data = await client.get(
f"http://{self.ip}{path}", headers={"User-Agent": "BTC Tools v0.1"}
)
if data.status_code == 200:
return data.json()
if ignore_errors:
return {}
raise APIError(
f"Web command failed: path={path}, code={data.status_code}"
)
except (httpx.HTTPError, json.JSONDecodeError):
if ignore_errors:
return {}
raise APIError(f"Web command failed: path={path}")
async def auth(self, session: httpx.AsyncClient):
login = {"luci_username": self.username, "luci_password": self.pwd}
url = f"http://{self.ip}/cgi-bin/luci"
headers = {
"User-Agent": (
"BTC Tools v0.1"
), # only seems to respond if this user-agent is set
"Content-Type": "application/x-www-form-urlencoded",
}
await session.post(url, headers=headers, data=login)
async def get_net_conf(self):
return await self.send_command("/cgi-bin/luci/admin/network/iface_status/lan")
async def get_cfg_metadata(self):
return await self.send_command("/cgi-bin/luci/admin/miner/cfg_metadata")
async def get_cfg_data(self):
return await self.send_command("/cgi-bin/luci/admin/miner/cfg_data")
async def get_bos_info(self):
return await self.send_command("/cgi-bin/luci/bos/info")
async def get_overview(self):
return await self.send_command(
"/cgi-bin/luci/admin/status/overview?status=1"
) # needs status=1 or it fails
async def get_api_status(self):
return await self.send_command("/cgi-bin/luci/admin/miner/api_status")
class BOSMinerGRPCStub(
ApiVersionServiceStub,
AuthenticationServiceStub,
@@ -279,7 +38,7 @@ class BOSMinerGRPCStub(
pass
class BOSMinerGRPCAPI:
class BOSerGRPCAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"

View File

@@ -0,0 +1,83 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
import httpx
from pyasic import settings
from pyasic.errors import APIError
class BOSMinerLuCIAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
async def multicommand(self, *commands: str) -> dict:
data = {}
for command in commands:
data[command] = await self.send_command(command, ignore_errors=True)
return data
async def send_command(self, path: str, ignore_errors: bool = False) -> dict:
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
await self.auth(client)
data = await client.get(
f"http://{self.ip}/cgi-bin/luci/{path}",
headers={"User-Agent": "BTC Tools v0.1"},
)
if data.status_code == 200:
return data.json()
if ignore_errors:
return {}
raise APIError(
f"LUCI web command failed: path={path}, code={data.status_code}"
)
except (httpx.HTTPError, json.JSONDecodeError):
if ignore_errors:
return {}
raise APIError(f"LUCI web command failed: path={path}")
async def auth(self, session: httpx.AsyncClient):
login = {"luci_username": self.username, "luci_password": self.pwd}
url = f"http://{self.ip}/cgi-bin/luci"
headers = {
"User-Agent": "BTC Tools v0.1", # only seems to respond if this user-agent is set
"Content-Type": "application/x-www-form-urlencoded",
}
await session.post(url, headers=headers, data=login)
async def get_net_conf(self):
return await self.send_command("admin/network/iface_status/lan")
async def get_cfg_metadata(self):
return await self.send_command("admin/miner/cfg_metadata")
async def get_cfg_data(self):
return await self.send_command("admin/miner/cfg_data")
async def get_bos_info(self):
return await self.send_command("bos/info")
async def get_overview(self):
return await self.send_command(
"admin/status/overview?status=1"
) # needs status=1 or it fails
async def get_api_status(self):
return await self.send_command("admin/miner/api_status")

View File

@@ -145,3 +145,6 @@ class VNishWebAPI(BaseWebAPI):
async def settings(self):
return await self.send_command("settings")
async def autotune_presets(self):
return await self.send_command("autotune/presets")