Merge branch 'dev_boser'

# Conflicts:
#	pyasic/miners/antminer/hiveon/X9/T9.py
#	pyasic/miners/backends/bosminer_old.py
#	pyasic/miners/backends/braiins_os.py
#	pyasic/miners/backends/btminer.py
#	pyasic/miners/backends/cgminer_avalon.py
#	pyasic/miners/backends/epic.py
#	pyasic/miners/backends/hiveon.py
#	pyasic/miners/backends/innosilicon.py
#	pyasic/miners/base.py
#	tests/miners_tests/__init__.py
This commit is contained in:
UpstreamData
2024-01-15 14:25:02 -07:00
39 changed files with 1247 additions and 1018 deletions

View File

@@ -99,13 +99,13 @@ class MinerConfig:
**self.power_scaling.as_bosminer(), **self.power_scaling.as_bosminer(),
} }
def as_bos_grpc(self, user_suffix: str = None) -> dict: def as_boser(self, user_suffix: str = None) -> dict:
return { return {
**self.fan_mode.as_bos_grpc(), **self.fan_mode.as_boser(),
**self.temperature.as_bos_grpc(), **self.temperature.as_boser(),
**self.mining_mode.as_bos_grpc(), **self.mining_mode.as_boser(),
**self.pools.as_bos_grpc(user_suffix=user_suffix), **self.pools.as_boser(user_suffix=user_suffix),
**self.power_scaling.as_bos_grpc(), **self.power_scaling.as_boser(),
} }
def as_epic(self, user_suffix: str = None) -> dict: def as_epic(self, user_suffix: str = None) -> dict:
@@ -161,6 +161,16 @@ class MinerConfig:
power_scaling=PowerScalingConfig.from_bosminer(toml_conf), power_scaling=PowerScalingConfig.from_bosminer(toml_conf),
) )
@classmethod
def from_boser(cls, grpc_miner_conf: dict) -> "MinerConfig":
return cls(
pools=PoolConfig.from_boser(grpc_miner_conf),
mining_mode=MiningModeConfig.from_boser(grpc_miner_conf),
fan_mode=FanModeConfig.from_boser(grpc_miner_conf),
temperature=TemperatureConfig.from_boser(grpc_miner_conf),
power_scaling=PowerScalingConfig.from_boser(grpc_miner_conf),
)
@classmethod @classmethod
def from_epic(cls, web_conf: dict) -> "MinerConfig": def from_epic(cls, web_conf: dict) -> "MinerConfig":
return cls( return cls(

View File

@@ -44,8 +44,8 @@ class MinerConfigOption(Enum):
def as_bosminer(self) -> dict: def as_bosminer(self) -> dict:
return self.value.as_bosminer() return self.value.as_bosminer()
def as_bos_grpc(self) -> dict: def as_boser(self) -> dict:
return self.value.as_bos_grpc() return self.value.as_boser()
def as_epic(self) -> dict: def as_epic(self) -> dict:
return self.value.as_epic() return self.value.as_epic()
@@ -91,7 +91,7 @@ class MinerConfigValue:
def as_bosminer(self) -> dict: def as_bosminer(self) -> dict:
return {} return {}
def as_bos_grpc(self) -> dict: def as_boser(self) -> dict:
return {} return {}
def as_epic(self) -> dict: def as_epic(self) -> dict:

View File

@@ -182,3 +182,23 @@ class FanModeConfig(MinerConfigOption):
return cls.manual().from_vnish(web_settings["miner"]["cooling"]) return cls.manual().from_vnish(web_settings["miner"]["cooling"])
elif mode == "immers": elif mode == "immers":
return cls.immersion() return cls.immersion()
@classmethod
def from_boser(cls, grpc_miner_conf: dict):
try:
temperature_conf = grpc_miner_conf["temperature"]
except LookupError:
return cls.default()
keys = temperature_conf.keys()
if "auto" in keys:
if "minimumRequiredFans" in keys:
return cls.normal(temperature_conf["minimumRequiredFans"])
return cls.normal()
if "manual" in keys:
conf = {}
if "fanSpeedRatio" in temperature_conf["manual"].keys():
conf["speed"] = int(temperature_conf["manual"]["fanSpeedRatio"])
if "minimumRequiredFans" in keys:
conf["minimum_fans"] = int(temperature_conf["minimumRequiredFans"])
return cls.manual(**conf)

View File

@@ -17,6 +17,16 @@ from dataclasses import dataclass, field
from typing import Dict, Union from typing import Dict, Union
from pyasic.config.base import MinerConfigOption, MinerConfigValue from pyasic.config.base import MinerConfigOption, MinerConfigValue
from pyasic.web.braiins_os.proto.braiins.bos.v1 import (
HashrateTargetMode,
PerformanceMode,
Power,
PowerTargetMode,
SaveAction,
SetPerformanceModeRequest,
TeraHashrate,
TunerPerformanceMode,
)
@dataclass @dataclass
@@ -99,6 +109,20 @@ class MiningModePowerTune(MinerConfigValue):
def as_bosminer(self) -> dict: def as_bosminer(self) -> dict:
return {"autotuning": {"enabled": True, "psu_power_limit": self.power}} return {"autotuning": {"enabled": True, "psu_power_limit": self.power}}
def as_boser(self) -> dict:
return {
"set_performance_mode": SetPerformanceModeRequest(
save_action=SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
mode=PerformanceMode(
tuner_mode=TunerPerformanceMode(
power_target=PowerTargetMode(
power_target=Power(watt=self.power)
)
)
),
),
}
@dataclass @dataclass
class MiningModeHashrateTune(MinerConfigValue): class MiningModeHashrateTune(MinerConfigValue):
@@ -112,6 +136,22 @@ class MiningModeHashrateTune(MinerConfigValue):
def as_am_modern(self) -> dict: def as_am_modern(self) -> dict:
return {"miner-mode": "0"} return {"miner-mode": "0"}
def as_boser(self) -> dict:
return {
"set_performance_mode": SetPerformanceModeRequest(
save_action=SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
mode=PerformanceMode(
tuner_mode=TunerPerformanceMode(
hashrate_target=HashrateTargetMode(
hashrate_target=TeraHashrate(
terahash_per_second=self.hashrate
)
)
)
),
)
}
@dataclass @dataclass
class ManualBoardSettings(MinerConfigValue): class ManualBoardSettings(MinerConfigValue):
@@ -260,3 +300,33 @@ class MiningModeConfig(MinerConfigOption):
return MiningModeManual.from_vnish(mode_settings) return MiningModeManual.from_vnish(mode_settings)
else: else:
return cls.power_tuning(int(mode_settings["preset"])) return cls.power_tuning(int(mode_settings["preset"]))
@classmethod
def from_boser(cls, grpc_miner_conf: dict):
try:
tuner_conf = grpc_miner_conf["tuner"]
if not tuner_conf.get("enabled", False):
return cls.default()
except LookupError:
return cls.default()
if tuner_conf.get("tunerMode") is not None:
if tuner_conf["tunerMode"] == 1:
if tuner_conf.get("powerTarget") is not None:
return cls.power_tuning(tuner_conf["powerTarget"]["watt"])
return cls.power_tuning()
if tuner_conf["tunerMode"] == 2:
if tuner_conf.get("hashrateTarget") is not None:
return cls.hashrate_tuning(
int(tuner_conf["hashrateTarget"]["terahashPerSecond"])
)
return cls.hashrate_tuning()
if tuner_conf.get("powerTarget") is not None:
return cls.power_tuning(tuner_conf["powerTarget"]["watt"])
if tuner_conf.get("hashrateTarget") is not None:
return cls.hashrate_tuning(
int(tuner_conf["hashrateTarget"]["terahashPerSecond"])
)

View File

@@ -149,6 +149,14 @@ class Pool(MinerConfigValue):
password=web_pool["pass"], password=web_pool["pass"],
) )
@classmethod
def from_boser(cls, grpc_pool: dict) -> "Pool":
return cls(
url=grpc_pool["url"],
user=grpc_pool["user"],
password=grpc_pool["password"],
)
@dataclass @dataclass
class PoolGroup(MinerConfigValue): class PoolGroup(MinerConfigValue):
@@ -287,6 +295,19 @@ class PoolGroup(MinerConfigValue):
def from_vnish(cls, web_settings_pools: dict) -> "PoolGroup": def from_vnish(cls, web_settings_pools: dict) -> "PoolGroup":
return cls([Pool.from_vnish(p) for p in web_settings_pools]) return cls([Pool.from_vnish(p) for p in web_settings_pools])
@classmethod
def from_boser(cls, grpc_pool_group: dict):
try:
return cls(
pools=[Pool.from_boser(p) for p in grpc_pool_group["pools"]],
name=grpc_pool_group["name"],
quota=grpc_pool_group["quota"]["value"]
if grpc_pool_group.get("quota") is not None
else 1,
)
except LookupError:
return cls()
@dataclass @dataclass
class PoolConfig(MinerConfigValue): class PoolConfig(MinerConfigValue):
@@ -349,7 +370,7 @@ class PoolConfig(MinerConfigValue):
} }
return {"group": [PoolGroup().as_bosminer()]} return {"group": [PoolGroup().as_bosminer()]}
def as_bos_grpc(self, user_suffix: str = None) -> dict: def as_boser(self, user_suffix: str = None) -> dict:
return {} return {}
@classmethod @classmethod
@@ -394,3 +415,15 @@ class PoolConfig(MinerConfigValue):
return cls([PoolGroup.from_vnish(web_settings["miner"]["pools"])]) return cls([PoolGroup.from_vnish(web_settings["miner"]["pools"])])
except LookupError: except LookupError:
return cls() return cls()
@classmethod
def from_boser(cls, grpc_miner_conf: dict):
try:
return cls(
groups=[
PoolGroup.from_boser(group)
for group in grpc_miner_conf["poolGroups"]
]
)
except LookupError:
return cls()

View File

@@ -17,7 +17,12 @@ from dataclasses import dataclass, field
from typing import Union from typing import Union
from pyasic.config.base import MinerConfigOption, MinerConfigValue from pyasic.config.base import MinerConfigOption, MinerConfigValue
from pyasic.web.bosminer.proto.braiins.bos.v1 import DpsPowerTarget, DpsTarget, Hours from pyasic.web.braiins_os.proto.braiins.bos.v1 import (
DpsPowerTarget,
DpsTarget,
Power,
SetDpsRequest,
)
@dataclass @dataclass
@@ -37,13 +42,8 @@ class PowerScalingShutdownEnabled(MinerConfigValue):
return cfg return cfg
def as_bos_grpc(self) -> dict: def as_boser(self) -> dict:
cfg = {"enable_shutdown ": True} return {"enable_shutdown": True, "shutdown_duration": self.duration}
if self.duration is not None:
cfg["shutdown_duration"] = Hours(self.duration)
return cfg
@dataclass @dataclass
@@ -57,7 +57,7 @@ class PowerScalingShutdownDisabled(MinerConfigValue):
def as_bosminer(self) -> dict: def as_bosminer(self) -> dict:
return {"shutdown_enabled": False} return {"shutdown_enabled": False}
def as_bos_grpc(self) -> dict: def as_boser(self) -> dict:
return {"enable_shutdown ": False} return {"enable_shutdown ": False}
@@ -88,6 +88,19 @@ class PowerScalingShutdown(MinerConfigOption):
return cls.disabled() return cls.disabled()
return None return None
@classmethod
def from_boser(cls, power_scaling_conf: dict):
sd_enabled = power_scaling_conf.get("shutdownEnabled")
if sd_enabled is not None:
if sd_enabled:
try:
return cls.enabled(power_scaling_conf["shutdownDuration"]["hours"])
except KeyError:
return cls.enabled()
else:
return cls.disabled()
return None
@dataclass @dataclass
class PowerScalingEnabled(MinerConfigValue): class PowerScalingEnabled(MinerConfigValue):
@@ -133,20 +146,19 @@ class PowerScalingEnabled(MinerConfigValue):
return {"power_scaling": cfg} return {"power_scaling": cfg}
def as_bos_grpc(self) -> dict: def as_boser(self) -> dict:
cfg = {"enable": True} return {
target_conf = {} "set_dps": SetDpsRequest(
if self.power_step is not None: enable=True,
target_conf["power_step"] = self.power_step **self.shutdown_enabled.as_boser(),
if self.minimum_power is not None: target=DpsTarget(
target_conf["min_power_target"] = self.minimum_power power_target=DpsPowerTarget(
power_step=Power(self.power_step),
cfg["target"] = DpsTarget(power_target=DpsPowerTarget(**target_conf)) min_power_target=Power(self.minimum_power),
)
if self.shutdown_enabled is not None: ),
cfg = {**cfg, **self.shutdown_enabled.as_bos_grpc()} ),
}
return {"dps": cfg}
@dataclass @dataclass
@@ -187,3 +199,20 @@ class PowerScalingConfig(MinerConfigOption):
return cls.disabled() return cls.disabled()
return cls.default() return cls.default()
@classmethod
def from_boser(cls, grpc_miner_conf: dict):
try:
dps_conf = grpc_miner_conf["dps"]
if not dps_conf.get("enabled", False):
return cls.disabled()
except LookupError:
return cls.default()
conf = {"shutdown_enabled": PowerScalingShutdown.from_boser(dps_conf)}
if dps_conf.get("minPowerTarget") is not None:
conf["minimum_power"] = dps_conf["minPowerTarget"]["watt"]
if dps_conf.get("powerStep") is not None:
conf["power_step"] = dps_conf["powerStep"]["watt"]
return cls.enabled(**conf)

View File

@@ -80,3 +80,34 @@ class TemperatureConfig(MinerConfigValue):
except KeyError: except KeyError:
pass pass
return cls() return cls()
@classmethod
def from_boser(cls, grpc_miner_conf: dict):
try:
temperature_conf = grpc_miner_conf["temperature"]
except KeyError:
return cls.default()
root_key = None
for key in ["auto", "manual", "disabled"]:
if key in temperature_conf.keys():
root_key = key
break
if root_key is None:
return cls.default()
conf = {}
keys = temperature_conf[root_key].keys()
if "targetTemperature" in keys:
conf["target"] = int(
temperature_conf[root_key]["targetTemperature"]["degreeC"]
)
if "hotTemperature" in keys:
conf["hot"] = int(temperature_conf[root_key]["hotTemperature"]["degreeC"])
if "dangerousTemperature" in keys:
conf["danger"] = int(
temperature_conf[root_key]["dangerousTemperature"]["degreeC"]
)
return cls(**conf)
return cls.default()

View File

@@ -14,21 +14,21 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from pyasic.miners.backends import BOSMiner from pyasic.miners.backends import BOSer
from pyasic.miners.types import S17, S17e, S17Plus, S17Pro from pyasic.miners.types import S17, S17e, S17Plus, S17Pro
class BOSMinerS17(BOSMiner, S17): class BOSMinerS17(BOSer, S17):
pass pass
class BOSMinerS17Plus(BOSMiner, S17Plus): class BOSMinerS17Plus(BOSer, S17Plus):
pass pass
class BOSMinerS17Pro(BOSMiner, S17Pro): class BOSMinerS17Pro(BOSer, S17Pro):
pass pass
class BOSMinerS17e(BOSMiner, S17e): class BOSMinerS17e(BOSer, S17e):
pass pass

View File

@@ -14,17 +14,17 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from pyasic.miners.backends import BOSMiner from pyasic.miners.backends import BOSer
from pyasic.miners.types import T17, T17e, T17Plus from pyasic.miners.types import T17, T17e, T17Plus
class BOSMinerT17(BOSMiner, T17): class BOSMinerT17(BOSer, T17):
pass pass
class BOSMinerT17Plus(BOSMiner, T17Plus): class BOSMinerT17Plus(BOSer, T17Plus):
pass pass
class BOSMinerT17e(BOSMiner, T17e): class BOSMinerT17e(BOSer, T17e):
pass pass

View File

@@ -14,7 +14,7 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from pyasic.miners.backends import BOSMiner from pyasic.miners.backends import BOSer
from pyasic.miners.types import ( from pyasic.miners.types import (
S19, S19,
S19XP, S19XP,
@@ -30,45 +30,45 @@ from pyasic.miners.types import (
) )
class BOSMinerS19(BOSMiner, S19): class BOSMinerS19(BOSer, S19):
pass pass
class BOSMinerS19Plus(BOSMiner, S19Plus): class BOSMinerS19Plus(BOSer, S19Plus):
pass pass
class BOSMinerS19Pro(BOSMiner, S19Pro): class BOSMinerS19Pro(BOSer, S19Pro):
pass pass
class BOSMinerS19a(BOSMiner, S19a): class BOSMinerS19a(BOSer, S19a):
pass pass
class BOSMinerS19j(BOSMiner, S19j): class BOSMinerS19j(BOSer, S19j):
pass pass
class BOSMinerS19jNoPIC(BOSMiner, S19jNoPIC): class BOSMinerS19jNoPIC(BOSer, S19jNoPIC):
pass pass
class BOSMinerS19jPro(BOSMiner, S19jPro): class BOSMinerS19jPro(BOSer, S19jPro):
pass pass
class BOSMinerS19kProNoPIC(BOSMiner, S19kProNoPIC): class BOSMinerS19kProNoPIC(BOSer, S19kProNoPIC):
pass pass
class BOSMinerS19aPro(BOSMiner, S19aPro): class BOSMinerS19aPro(BOSer, S19aPro):
pass pass
class BOSMinerS19jProPlus(BOSMiner, S19jProPlus): class BOSMinerS19jProPlus(BOSer, S19jProPlus):
pass pass
class BOSMinerS19XP(BOSMiner, S19XP): class BOSMinerS19XP(BOSer, S19XP):
pass pass

View File

@@ -14,9 +14,9 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from pyasic.miners.backends import BOSMiner from pyasic.miners.backends import BOSer
from pyasic.miners.types import T19 from pyasic.miners.types import T19
class BOSMinerT19(BOSMiner, T19): class BOSMinerT19(BOSer, T19):
pass pass

View File

@@ -30,7 +30,7 @@ class HiveonT9(Hiveon, T9):
### DATA GATHERING FUNCTIONS (get_{some_data}) ### ### DATA GATHERING FUNCTIONS (get_{some_data}) ###
################################################## ##################################################
async def _get_mac(self): async def get_mac(self):
try: try:
mac = ( mac = (
(await self.send_ssh_command("cat /sys/class/net/eth0/address")) (await self.send_ssh_command("cat /sys/class/net/eth0/address"))
@@ -41,7 +41,6 @@ class HiveonT9(Hiveon, T9):
except (TypeError, ValueError, asyncssh.Error, OSError, AttributeError): except (TypeError, ValueError, asyncssh.Error, OSError, AttributeError):
pass pass
async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]: async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
hashboards = [ hashboards = [
HashBoard(slot=board, expected_chips=self.expected_chips) HashBoard(slot=board, expected_chips=self.expected_chips)
@@ -70,14 +69,14 @@ class HiveonT9(Hiveon, T9):
hashboards[board].chip_temp = api_stats["STATS"][1][ hashboards[board].chip_temp = api_stats["STATS"][1][
f"temp2_{chipset}" f"temp2_{chipset}"
] ]
except LookupError: except (KeyError, IndexError):
pass pass
else: else:
hashboards[board].missing = False hashboards[board].missing = False
try: try:
hashrate += api_stats["STATS"][1][f"chain_rate{chipset}"] hashrate += api_stats["STATS"][1][f"chain_rate{chipset}"]
chips += api_stats["STATS"][1][f"chain_acn{chipset}"] chips += api_stats["STATS"][1][f"chain_acn{chipset}"]
except LookupError: except (KeyError, IndexError):
pass pass
hashboards[board].hashrate = round(hashrate / 1000, 2) hashboards[board].hashrate = round(hashrate / 1000, 2)
hashboards[board].chips = chips hashboards[board].chips = chips
@@ -95,7 +94,7 @@ class HiveonT9(Hiveon, T9):
boards = api_stats.get("STATS") boards = api_stats.get("STATS")
try: try:
wattage_raw = boards[1]["chain_power"] wattage_raw = boards[1]["chain_power"]
except LookupError: except (KeyError, IndexError):
pass pass
else: else:
# parse wattage position out of raw data # parse wattage position out of raw data
@@ -120,7 +119,7 @@ class HiveonT9(Hiveon, T9):
env_temp = api_stats["STATS"][1][f"temp3_{chipset}"] env_temp = api_stats["STATS"][1][f"temp3_{chipset}"]
if not env_temp == 0: if not env_temp == 0:
env_temp_list.append(int(env_temp)) env_temp_list.append(int(env_temp))
except LookupError: except (KeyError, IndexError):
pass pass
if not env_temp_list == []: if not env_temp_list == []:

View File

@@ -17,7 +17,7 @@ from .antminer import AntminerModern, AntminerOld
from .bfgminer import BFGMiner from .bfgminer import BFGMiner
from .bfgminer_goldshell import BFGMinerGoldshell from .bfgminer_goldshell import BFGMinerGoldshell
from .bmminer import BMMiner from .bmminer import BMMiner
from .bosminer import BOSMiner from .braiins_os import BOSer, BOSMiner
from .btminer import BTMiner from .btminer import BTMiner
from .cgminer import CGMiner from .cgminer import CGMiner
from .cgminer_avalon import CGMinerAvalon from .cgminer_avalon import CGMinerAvalon

View File

@@ -1,155 +0,0 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import logging
from typing import List, Optional, Tuple
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard, MinerData
from pyasic.data.error_codes import MinerErrorData
from pyasic.miners.backends import BOSMiner
class BOSMinerOld(BOSMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver)
async def send_ssh_command(self, cmd: str) -> Optional[str]:
result = None
try:
conn = await self._get_ssh_connection()
except ConnectionError:
return None
# open an ssh connection
async with conn:
# 3 retries
for i in range(3):
try:
# run the command and get the result
result = await conn.run(cmd)
result = result.stdout
except Exception as e:
# if the command fails, log it
logging.warning(f"{self} command {cmd} error: {e}")
# on the 3rd retry, return None
if i == 3:
return
continue
# return the result, either command output or None
return result
async def update_to_plus(self):
result = await self.send_ssh_command("opkg update && opkg install bos_plus")
return result
async def check_light(self) -> bool:
return False
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self) -> None:
return None
async def reboot(self) -> bool:
return False
async def restart_backend(self) -> bool:
return False
async def stop_mining(self) -> bool:
return False
async def resume_mining(self) -> bool:
return False
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
return None
async def set_power_limit(self, wattage: int) -> bool:
return False
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_mac(self, *args, **kwargs) -> Optional[str]:
return None
async def get_model(self, *args, **kwargs) -> str:
return "S9"
async def get_version(self, *args, **kwargs) -> Tuple[Optional[str], Optional[str]]:
return None, None
async def _get_hostname(self, *args, **kwargs) -> Optional[str]:
return None
async def _get_hashrate(self, *args, **kwargs) -> Optional[float]:
return None
async def _get_hashboards(self, *args, **kwargs) -> List[HashBoard]:
return []
async def _get_env_temp(self, *args, **kwargs) -> Optional[float]:
return None
async def _get_wattage(self, *args, **kwargs) -> Optional[int]:
return None
async def _get_wattage_limit(self, *args, **kwargs) -> Optional[int]:
return None
async def _get_fans(
self,
*args,
**kwargs,
) -> List[Fan]:
return [Fan(), Fan(), Fan(), Fan()]
async def _get_fan_psu(self, *args, **kwargs) -> Optional[int]:
return None
async def _get_api_ver(self, *args, **kwargs) -> Optional[str]:
return None
async def _get_fw_ver(self, *args, **kwargs) -> Optional[str]:
return None
async def _get_errors(self, *args, **kwargs) -> List[MinerErrorData]:
return []
async def _get_fault_light(self, *args, **kwargs) -> bool:
return False
async def _get_expected_hashrate(self, *args, **kwargs) -> Optional[float]:
return None
async def get_data(self, allow_warning: bool = False, **kwargs) -> MinerData:
return MinerData(ip=str(self.ip))
async def _is_mining(self, *args, **kwargs) -> Optional[bool]:
return None
async def _get_uptime(self, *args, **kwargs) -> Optional[int]:
return None

View File

@@ -557,7 +557,6 @@ class BTMiner(BaseMiner):
errors.append(WhatsminerError(error_code=err)) errors.append(WhatsminerError(error_code=err))
except (LookupError, ValueError, TypeError): except (LookupError, ValueError, TypeError):
pass pass
return errors return errors
async def _get_expected_hashrate(self, api_summary: dict = None): async def _get_expected_hashrate(self, api_summary: dict = None):

View File

@@ -208,7 +208,7 @@ class CGMinerAvalon(CGMiner):
if api_devs: if api_devs:
try: try:
return round(float(api_devs["DEVS"][0]["MHS 1m"] / 1000000), 2) return round(float(api_devs["DEVS"][0]["MHS 1m"] / 1000000), 2)
except (LookupError, ValueError, TypeError): except (KeyError, IndexError, ValueError, TypeError):
pass pass
async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]: async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
@@ -227,7 +227,7 @@ class CGMinerAvalon(CGMiner):
try: try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"] unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_stats = self.parse_stats(unparsed_stats)
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
return hashboards return hashboards
for board in range(self.expected_hashboards): for board in range(self.expected_hashboards):
@@ -271,7 +271,7 @@ class CGMinerAvalon(CGMiner):
unparsed_stats = api_stats["STATS"][0]["MM ID0"] unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_stats = self.parse_stats(unparsed_stats)
return round(float(parsed_stats["GHSmm"]) / 1000, 2) return round(float(parsed_stats["GHSmm"]) / 1000, 2)
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_env_temp(self, api_stats: dict = None) -> Optional[float]: async def _get_env_temp(self, api_stats: dict = None) -> Optional[float]:
@@ -286,7 +286,7 @@ class CGMinerAvalon(CGMiner):
unparsed_stats = api_stats["STATS"][0]["MM ID0"] unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_stats = self.parse_stats(unparsed_stats)
return float(parsed_stats["Temp"]) return float(parsed_stats["Temp"])
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_wattage(self) -> Optional[int]: async def _get_wattage(self) -> Optional[int]:
@@ -304,7 +304,7 @@ class CGMinerAvalon(CGMiner):
unparsed_stats = api_stats["STATS"][0]["MM ID0"] unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_stats = self.parse_stats(unparsed_stats)
return int(parsed_stats["MPO"]) return int(parsed_stats["MPO"])
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_fans(self, api_stats: dict = None) -> List[Fan]: async def _get_fans(self, api_stats: dict = None) -> List[Fan]:
@@ -325,7 +325,7 @@ class CGMinerAvalon(CGMiner):
for fan in range(self.expected_fans): for fan in range(self.expected_fans):
try: try:
fans_data[fan].speed = int(parsed_stats[f"Fan{fan + 1}"]) fans_data[fan].speed = int(parsed_stats[f"Fan{fan + 1}"])
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
return fans_data return fans_data
@@ -347,7 +347,7 @@ class CGMinerAvalon(CGMiner):
parsed_stats = self.parse_stats(unparsed_stats) parsed_stats = self.parse_stats(unparsed_stats)
led = int(parsed_stats["Led"]) led = int(parsed_stats["Led"])
return True if led == 1 else False return True if led == 1 else False
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
try: try:
@@ -366,6 +366,3 @@ class CGMinerAvalon(CGMiner):
async def _get_uptime(self) -> Optional[int]: async def _get_uptime(self) -> Optional[int]:
return None return None
async def get_uptime(self) -> Optional[int]:
return None

View File

@@ -188,7 +188,7 @@ class ePIC(BaseMiner):
if web_summary: if web_summary:
try: try:
hashrate = 0 hashrate = 0
if web_summary["HBs"] != None: if web_summary["HBs"] is not None:
for hb in web_summary["HBs"]: for hb in web_summary["HBs"]:
hashrate += hb["Hashrate"][0] hashrate += hb["Hashrate"][0]
return round(float(float(hashrate / 1000000)), 2) return round(float(float(hashrate / 1000000)), 2)
@@ -207,7 +207,7 @@ class ePIC(BaseMiner):
if web_summary: if web_summary:
try: try:
hashrate = 0 hashrate = 0
if web_summary["HBs"] != None: if web_summary["HBs"] is not None:
for hb in web_summary["HBs"]: for hb in web_summary["HBs"]:
if hb["Hashrate"][1] == 0: if hb["Hashrate"][1] == 0:
ideal = 1.0 ideal = 1.0
@@ -266,7 +266,7 @@ class ePIC(BaseMiner):
HashBoard(slot=i, expected_chips=self.expected_chips) HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards) for i in range(self.expected_hashboards)
] ]
if web_summary["HBs"] != None: if web_summary["HBs"] is not None:
for hb in web_summary["HBs"]: for hb in web_summary["HBs"]:
for hr in web_hashrate: for hr in web_hashrate:
if hr["Index"] == hb["Index"]: if hr["Index"] == hb["Index"]:
@@ -312,7 +312,7 @@ class ePIC(BaseMiner):
if web_summary: if web_summary:
try: try:
error = web_summary["Status"]["Last Error"] error = web_summary["Status"]["Last Error"]
if error != None: if error is not None:
errors.append(X19Error(str(error))) errors.append(X19Error(str(error)))
return errors return errors
except KeyError: except KeyError:
@@ -328,9 +328,6 @@ class ePIC(BaseMiner):
def _get_api_ver(self, *args, **kwargs) -> Optional[str]: def _get_api_ver(self, *args, **kwargs) -> Optional[str]:
pass pass
def get_config(self) -> MinerConfig:
return self.config
def _get_env_temp(self, *args, **kwargs) -> Optional[float]: def _get_env_temp(self, *args, **kwargs) -> Optional[float]:
pass pass

View File

@@ -22,7 +22,7 @@ from pyasic.miners.base import DataFunction, DataLocations, DataOptions, RPCAPIC
HIVEON_DATA_LOC = DataLocations( HIVEON_DATA_LOC = DataLocations(
**{ **{
str(DataOptions.MAC): DataFunction("_get_mac"), str(DataOptions.MAC): DataFunction("get_mac"),
str(DataOptions.API_VERSION): DataFunction( str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver", [RPCAPICommand("api_version", "version")] "_get_api_ver", [RPCAPICommand("api_version", "version")]
), ),
@@ -45,7 +45,7 @@ HIVEON_DATA_LOC = DataLocations(
str(DataOptions.WATTAGE): DataFunction( str(DataOptions.WATTAGE): DataFunction(
"_get_wattage", [RPCAPICommand("api_stats", "stats")] "_get_wattage", [RPCAPICommand("api_stats", "stats")]
), ),
str(DataOptions.WATTAGE_LIMIT): DataFunction("_get_wattage_limit"), str(DataOptions.WATTAGE_LIMIT): DataFunction("get_wattage_limit"),
str(DataOptions.FANS): DataFunction( str(DataOptions.FANS): DataFunction(
"_get_fans", [RPCAPICommand("api_stats", "stats")] "_get_fans", [RPCAPICommand("api_stats", "stats")]
), ),
@@ -67,7 +67,6 @@ class Hiveon(BMMiner):
self.pwd = "admin" self.pwd = "admin"
# static data # static data
self.api_type = "Hiveon" self.api_type = "Hiveon"
self.fw_str = "Hive"
# data gathering locations # data gathering locations
self.data_locations = HIVEON_DATA_LOC self.data_locations = HIVEON_DATA_LOC

View File

@@ -340,7 +340,7 @@ class Innosilicon(CGMiner):
else: else:
web_get_all = web_get_all["all"] web_get_all = web_get_all["all"]
fans = [Fan() for _ in range(self.fan_count)] fans = [Fan() for _ in range(self.expected_fans)]
if web_get_all: if web_get_all:
try: try:
spd = web_get_all["fansSpeed"] spd = web_get_all["fansSpeed"]
@@ -348,7 +348,7 @@ class Innosilicon(CGMiner):
pass pass
else: else:
round((int(spd) * 6000) / 100) round((int(spd) * 6000) / 100)
for i in range(self.fan_count): for i in range(self.expected_fans):
fans[i].speed = spd fans[i].speed = spd
return fans return fans

View File

@@ -142,7 +142,7 @@ class BaseMiner(ABC):
@property @property
def model(self): def model(self):
model_data = [self.raw_model] model_data = [self.raw_model if self.raw_model is not None else "Unknown"]
if self.fw_str is not None: if self.fw_str is not None:
model_data.append(f"({self.fw_str})") model_data.append(f"({self.fw_str})")
return " ".join(model_data) return " ".join(model_data)

View File

@@ -476,6 +476,7 @@ class MinerFactory:
fn = miner_model_fns.get(miner_type) fn = miner_model_fns.get(miner_type)
if fn is not None: if fn is not None:
# noinspection PyArgumentList
task = asyncio.create_task(fn(ip)) task = asyncio.create_task(fn(ip))
try: try:
miner_model = await asyncio.wait_for( miner_model = await asyncio.wait_for(
@@ -484,15 +485,10 @@ class MinerFactory:
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
boser_enabled = None
if miner_type == MinerTypes.BRAIINS_OS:
boser_enabled = await self.get_boser_braiins_os(ip)
miner = self._select_miner_from_classes( miner = self._select_miner_from_classes(
ip, ip,
miner_type=miner_type, miner_type=miner_type,
miner_model=miner_model, miner_model=miner_model,
boser_enabled=boser_enabled,
) )
if miner is not None and not isinstance(miner, UnknownMiner): if miner is not None and not isinstance(miner, UnknownMiner):
@@ -775,13 +771,9 @@ class MinerFactory:
ip: ipaddress.ip_address, ip: ipaddress.ip_address,
miner_model: Union[str, None], miner_model: Union[str, None],
miner_type: Union[MinerTypes, None], miner_type: Union[MinerTypes, None],
boser_enabled: bool = None,
) -> AnyMiner: ) -> AnyMiner:
kwargs = {}
if boser_enabled is not None:
kwargs["boser"] = boser_enabled
try: try:
return MINER_CLASSES[miner_type][str(miner_model).upper()](ip, **kwargs) return MINER_CLASSES[miner_type][str(miner_model).upper()](ip)
except LookupError: except LookupError:
if miner_type in MINER_CLASSES: if miner_type in MINER_CLASSES:
return MINER_CLASSES[miner_type][None](ip) return MINER_CLASSES[miner_type][None](ip)
@@ -909,15 +901,6 @@ class MinerFactory:
except (httpx.HTTPError, LookupError): except (httpx.HTTPError, LookupError):
pass pass
async def get_boser_braiins_os(self, ip: str):
# TODO: refine this check
try:
sock_json_data = await self.send_api_command(ip, "version")
return sock_json_data["STATUS"][0]["Msg"].split(" ")[0].upper() == "BOSER"
except LookupError:
# let the bosminer class decide
return None
async def get_miner_model_vnish(self, ip: str) -> Optional[str]: async def get_miner_model_vnish(self, ip: str) -> Optional[str]:
sock_json_data = await self.send_api_command(ip, "stats") sock_json_data = await self.send_api_command(ip, "stats")
try: try:

View File

@@ -33,7 +33,6 @@ class UnknownMiner(BaseMiner):
super().__init__(ip) super().__init__(ip)
self.ip = ip self.ip = ip
self.api = UnknownAPI(ip) self.api = UnknownAPI(ip)
self.model = "Unknown"
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Unknown: {str(self.ip)}" return f"Unknown: {str(self.ip)}"

View File

@@ -26,6 +26,7 @@ class BaseWebAPI(ABC):
self.ip = ip # ipaddress.ip_address(ip) self.ip = ip # ipaddress.ip_address(ip)
self.username = "root" self.username = "root"
self.pwd = "root" self.pwd = "root"
self.port = 80
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if cls is BaseWebAPI: if cls is BaseWebAPI:

View File

@@ -35,10 +35,12 @@ class AntminerModernWebAPI(BaseWebAPI):
allow_warning: bool = True, allow_warning: bool = True,
**parameters: Union[str, int, bool], **parameters: Union[str, int, bool],
) -> dict: ) -> dict:
url = f"http://{self.ip}/cgi-bin/{command}.cgi" url = f"http://{self.ip}:{self.port}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd) auth = httpx.DigestAuth(self.username, self.pwd)
try: try:
async with httpx.AsyncClient(transport=settings.transport()) as client: async with httpx.AsyncClient(
transport=settings.transport(),
) as client:
if parameters: if parameters:
data = await client.post( data = await client.post(
url, url,
@@ -149,7 +151,7 @@ class AntminerOldWebAPI(BaseWebAPI):
allow_warning: bool = True, allow_warning: bool = True,
**parameters: Union[str, int, bool], **parameters: Union[str, int, bool],
) -> dict: ) -> dict:
url = f"http://{self.ip}/cgi-bin/{command}.cgi" url = f"http://{self.ip}:{self.port}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd) auth = httpx.DigestAuth(self.username, self.pwd)
try: try:
async with httpx.AsyncClient(transport=settings.transport()) as client: async with httpx.AsyncClient(transport=settings.transport()) as client:

View File

@@ -0,0 +1,149 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
from typing import Union
from pyasic import settings
from pyasic.errors import APIError
from pyasic.web import BaseWebAPI
from .graphql import BOSerGraphQLAPI
from .grpc import BOSerGRPCAPI
from .luci import BOSMinerLuCIAPI
class BOSMinerWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
self.luci = BOSMinerLuCIAPI(
ip, settings.get("default_bosminer_password", "root")
)
self._pwd = settings.get("default_bosminer_password", "root")
self._port = 80
super().__init__(ip)
@property
def pwd(self):
return self._pwd
@pwd.setter
def pwd(self, other: str):
self._pwd = other
self.luci.pwd = other
@property
def port(self):
return self._port
@port.setter
def port(self, other: str):
self._port = other
self.luci.port = other
async def send_command(
self,
command: Union[str, dict],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
return await self.luci.send_command(command)
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
return await self.luci.multicommand(*commands)
class BOSerWebAPI(BOSMinerWebAPI):
def __init__(self, ip: str) -> None:
self.gql = BOSerGraphQLAPI(
ip, settings.get("default_bosminer_password", "root")
)
self.grpc = BOSerGRPCAPI(ip, settings.get("default_bosminer_password", "root"))
self._port = 80
super().__init__(ip)
@property
def pwd(self):
return self._pwd
@pwd.setter
def pwd(self, other: str):
self._pwd = other
self.luci.pwd = other
self.gql.pwd = other
self.grpc.pwd = other
@property
def port(self):
return self._port
@port.setter
def port(self, other: str):
self._port = other
self.luci.port = other
self.gql.port = other
async def send_command(
self,
command: Union[str, dict],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
command_type = self.select_command_type(command)
if command_type == "gql":
return await self.gql.send_command(command)
elif command_type == "grpc":
try:
return await (getattr(self.grpc, command.replace("grpc_", "")))()
except AttributeError:
raise APIError(f"No gRPC command found for command: {command}")
elif command_type == "luci":
return await self.luci.send_command(command)
@staticmethod
def select_command_type(command: Union[str, dict]) -> str:
if isinstance(command, dict):
return "gql"
else:
return "grpc"
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
cmd_types = {"grpc": [], "gql": []}
for cmd in commands:
cmd_types[self.select_command_type(cmd)].append(cmd)
async def no_op():
return {}
if len(cmd_types["grpc"]) > 0:
grpc_data_t = asyncio.create_task(
self.grpc.multicommand(*cmd_types["grpc"])
)
else:
grpc_data_t = asyncio.create_task(no_op())
if len(cmd_types["gql"]) > 0:
gql_data_t = asyncio.create_task(self.gql.multicommand(*cmd_types["gql"]))
else:
gql_data_t = asyncio.create_task(no_op())
await asyncio.gather(grpc_data_t, gql_data_t)
data = dict(**grpc_data_t.result(), **gql_data_t.result())
return data

View File

@@ -0,0 +1,105 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
from typing import Union
import httpx
from pyasic import settings
class BOSerGraphQLAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
self.port = 80
async def multicommand(self, *commands: dict) -> dict:
def merge(*d: dict):
ret = {}
for i in d:
if i:
for k in i:
if not k in ret:
ret[k] = i[k]
else:
ret[k] = merge(ret[k], i[k])
return None if ret == {} else ret
command = merge(*commands)
data = await self.send_command(command)
if data is not None:
if data.get("data") is None:
try:
commands = list(commands)
# noinspection PyTypeChecker
commands.remove({"bos": {"faultLight": None}})
command = merge(*commands)
data = await self.send_command(command)
except (LookupError, ValueError):
pass
if not data:
data = {}
data["multicommand"] = False
return data
async def send_command(
self,
command: dict,
) -> dict:
url = f"http://{self.ip}:{self.port}/graphql"
query = command
if command is None:
return {}
if command.get("query") is None:
query = {"query": self.parse_command(command)}
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
await self.auth(client)
data = await client.post(url, json=query)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
def parse_command(self, graphql_command: Union[dict, set]) -> str:
if isinstance(graphql_command, dict):
data = []
for key in graphql_command:
if graphql_command[key] is not None:
parsed = self.parse_command(graphql_command[key])
data.append(key + parsed)
else:
data.append(key)
else:
data = graphql_command
return "{" + ",".join(data) + "}"
async def auth(self, client: httpx.AsyncClient) -> None:
url = f"http://{self.ip}:{self.port}/graphql"
await client.post(
url,
json={
"query": (
f'mutation{{auth{{login(username:"{self.username}", password:"{self.pwd}"){{__typename}}}}}}'
)
},
)

View File

@@ -13,258 +13,20 @@
# See the License for the specific language governing permissions and - # See the License for the specific language governing permissions and -
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
import json import asyncio
import logging
from datetime import timedelta from datetime import timedelta
from typing import Union
import httpx
from betterproto import Message from betterproto import Message
from grpclib import GRPCError, Status
from grpclib.client import Channel from grpclib.client import Channel
from pyasic import settings
from pyasic.errors import APIError from pyasic.errors import APIError
from pyasic.web import BaseWebAPI
from .proto.braiins.bos import * from .proto.braiins.bos import *
from .proto.braiins.bos.v1 import * from .proto.braiins.bos.v1 import *
class BOSMinerWebAPI(BaseWebAPI):
def __init__(self, ip: str, boser: bool = None) -> None:
if boser is None:
boser = True
if boser:
self.gql = BOSMinerGQLAPI(
ip, settings.get("default_bosminer_password", "root")
)
self.grpc = BOSMinerGRPCAPI(
ip, settings.get("default_bosminer_password", "root")
)
else:
self.gql = None
self.grpc = None
self.luci = BOSMinerLuCIAPI(
ip, settings.get("default_bosminer_password", "root")
)
self._pwd = settings.get("default_bosminer_password", "root")
super().__init__(ip)
@property
def pwd(self):
return self._pwd
@pwd.setter
def pwd(self, other: str):
self._pwd = other
self.luci.pwd = other
if self.gql is not None:
self.gql.pwd = other
if self.grpc is not None:
self.grpc.pwd = other
async def send_command(
self,
command: Union[str, dict],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
if isinstance(command, dict):
if self.gql is not None:
return await self.gql.send_command(command)
elif command.startswith("/cgi-bin/luci"):
return await self.gql.send_command(command)
else:
if self.grpc is not None:
return await self.grpc.send_command(command)
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
luci_commands = []
gql_commands = []
grpc_commands = []
for cmd in commands:
if isinstance(cmd, dict):
gql_commands.append(cmd)
elif cmd.startswith("/cgi-bin/luci"):
luci_commands.append(cmd)
else:
grpc_commands.append(cmd)
luci_data = await self.luci.multicommand(*luci_commands)
if self.gql is not None:
gql_data = await self.gql.multicommand(*gql_commands)
else:
gql_data = None
if self.grpc is not None:
grpc_data = await self.grpc.multicommand(*grpc_commands)
else:
grpc_data = None
if gql_data is None:
gql_data = {}
if luci_data is None:
luci_data = {}
if grpc_data is None:
grpc_data = {}
data = dict(**luci_data, **gql_data, **grpc_data)
return data
class BOSMinerGQLAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
async def multicommand(self, *commands: dict) -> dict:
def merge(*d: dict):
ret = {}
for i in d:
if i:
for k in i:
if not k in ret:
ret[k] = i[k]
else:
ret[k] = merge(ret[k], i[k])
return None if ret == {} else ret
command = merge(*commands)
data = await self.send_command(command)
if data is not None:
if data.get("data") is None:
try:
commands = list(commands)
# noinspection PyTypeChecker
commands.remove({"bos": {"faultLight": None}})
command = merge(*commands)
data = await self.send_command(command)
except (LookupError, ValueError):
pass
if not data:
data = {}
data["multicommand"] = False
return data
async def send_command(
self,
command: dict,
) -> dict:
url = f"http://{self.ip}/graphql"
query = command
if command is None:
return {}
if command.get("query") is None:
query = {"query": self.parse_command(command)}
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
await self.auth(client)
data = await client.post(url, json=query)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
def parse_command(self, graphql_command: Union[dict, set]) -> str:
if isinstance(graphql_command, dict):
data = []
for key in graphql_command:
if graphql_command[key] is not None:
parsed = self.parse_command(graphql_command[key])
data.append(key + parsed)
else:
data.append(key)
else:
data = graphql_command
return "{" + ",".join(data) + "}"
async def auth(self, client: httpx.AsyncClient) -> None:
url = f"http://{self.ip}/graphql"
await client.post(
url,
json={
"query": (
'mutation{auth{login(username:"'
+ "root"
+ '", password:"'
+ self.pwd
+ '"){__typename}}}'
)
},
)
class BOSMinerLuCIAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
async def multicommand(self, *commands: str) -> dict:
data = {}
for command in commands:
data[command] = await self.send_command(command, ignore_errors=True)
return data
async def send_command(self, path: str, ignore_errors: bool = False) -> dict:
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
await self.auth(client)
data = await client.get(
f"http://{self.ip}{path}", headers={"User-Agent": "BTC Tools v0.1"}
)
if data.status_code == 200:
return data.json()
if ignore_errors:
return {}
raise APIError(
f"Web command failed: path={path}, code={data.status_code}"
)
except (httpx.HTTPError, json.JSONDecodeError):
if ignore_errors:
return {}
raise APIError(f"Web command failed: path={path}")
async def auth(self, session: httpx.AsyncClient):
login = {"luci_username": self.username, "luci_password": self.pwd}
url = f"http://{self.ip}/cgi-bin/luci"
headers = {
"User-Agent": (
"BTC Tools v0.1"
), # only seems to respond if this user-agent is set
"Content-Type": "application/x-www-form-urlencoded",
}
await session.post(url, headers=headers, data=login)
async def get_net_conf(self):
return await self.send_command("/cgi-bin/luci/admin/network/iface_status/lan")
async def get_cfg_metadata(self):
return await self.send_command("/cgi-bin/luci/admin/miner/cfg_metadata")
async def get_cfg_data(self):
return await self.send_command("/cgi-bin/luci/admin/miner/cfg_data")
async def get_bos_info(self):
return await self.send_command("/cgi-bin/luci/bos/info")
async def get_overview(self):
return await self.send_command(
"/cgi-bin/luci/admin/status/overview?status=1"
) # needs status=1 or it fails
async def get_api_status(self):
return await self.send_command("/cgi-bin/luci/admin/miner/api_status")
class BOSMinerGRPCStub( class BOSMinerGRPCStub(
ApiVersionServiceStub, ApiVersionServiceStub,
AuthenticationServiceStub, AuthenticationServiceStub,
@@ -279,11 +41,12 @@ class BOSMinerGRPCStub(
pass pass
class BOSMinerGRPCAPI: class BOSerGRPCAPI:
def __init__(self, ip: str, pwd: str): def __init__(self, ip: str, pwd: str):
self.ip = ip self.ip = ip
self.username = "root" self.username = "root"
self.pwd = pwd self.pwd = pwd
self.port = 50051
self._auth = None self._auth = None
self._auth_time = datetime.now() self._auth_time = datetime.now()
@@ -305,7 +68,20 @@ class BOSMinerGRPCAPI:
] ]
async def multicommand(self, *commands: str) -> dict: async def multicommand(self, *commands: str) -> dict:
pass result = {"multicommand": True}
tasks = {}
for command in commands:
try:
tasks[command] = asyncio.create_task(getattr(self, command)())
except AttributeError:
result["command"] = {}
await asyncio.gather(*list(tasks.values()))
for cmd in tasks:
result[cmd] = tasks[cmd].result()
return result
async def send_command( async def send_command(
self, self,
@@ -317,13 +93,23 @@ class BOSMinerGRPCAPI:
metadata = [] metadata = []
if auth: if auth:
metadata.append(("authorization", await self.auth())) metadata.append(("authorization", await self.auth()))
async with Channel(self.ip, 50051) as c: try:
async with Channel(self.ip, self.port) as c:
endpoint = getattr(BOSMinerGRPCStub(c), command) endpoint = getattr(BOSMinerGRPCStub(c), command)
if endpoint is None: if endpoint is None:
if not ignore_errors: if not ignore_errors:
raise APIError(f"Command not found - {endpoint}") raise APIError(f"Command not found - {endpoint}")
return {} return {}
try:
return (await endpoint(message, metadata=metadata)).to_pydict() return (await endpoint(message, metadata=metadata)).to_pydict()
except GRPCError as e:
if e.status == Status.UNAUTHENTICATED:
await self._get_auth()
metadata = [("authorization", await self.auth())]
return (await endpoint(message, metadata=metadata)).to_pydict()
raise e
except GRPCError as e:
raise APIError(f"gRPC command failed - {endpoint}") from e
async def auth(self): async def auth(self):
if self._auth is not None and self._auth_time - datetime.now() < timedelta( if self._auth is not None and self._auth_time - datetime.now() < timedelta(
@@ -334,7 +120,7 @@ class BOSMinerGRPCAPI:
return self._auth return self._auth
async def _get_auth(self): async def _get_auth(self):
async with Channel(self.ip, 50051) as c: async with Channel(self.ip, self.port) as c:
req = LoginRequest(username=self.username, password=self.pwd) req = LoginRequest(username=self.username, password=self.pwd)
async with c.request( async with c.request(
"/braiins.bos.v1.AuthenticationService/Login", "/braiins.bos.v1.AuthenticationService/Login",
@@ -379,7 +165,9 @@ class BOSMinerGRPCAPI:
) )
async def get_locate_device_status(self): async def get_locate_device_status(self):
return await self.send_command("get_locate_device_status") return await self.send_command(
"get_locate_device_status", GetLocateDeviceStatusRequest()
)
async def set_password(self, password: str = None): async def set_password(self, password: str = None):
return await self.send_command( return await self.send_command(
@@ -402,10 +190,12 @@ class BOSMinerGRPCAPI:
) )
async def get_tuner_state(self): async def get_tuner_state(self):
return await self.send_command("get_tuner_state") return await self.send_command("get_tuner_state", GetTunerStateRequest())
async def list_target_profiles(self): async def list_target_profiles(self):
return await self.send_command("list_target_profiles") return await self.send_command(
"list_target_profiles", ListTargetProfilesRequest()
)
async def set_default_power_target( async def set_default_power_target(
self, save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY self, save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY
@@ -506,14 +296,70 @@ class BOSMinerGRPCAPI:
async def set_dps( async def set_dps(
self, self,
enable: bool,
power_step: int,
min_power_target: int,
enable_shutdown: bool = None,
shutdown_duration: int = None,
): ):
raise NotImplementedError
return await self.send_command("braiins.bos.v1.PerformanceService/SetDPS")
async def set_performance_mode(self):
raise NotImplementedError
return await self.send_command( return await self.send_command(
"braiins.bos.v1.PerformanceService/SetPerformanceMode" "set_dps",
message=SetDpsRequest(
enable=enable,
enable_shutdown=enable_shutdown,
shutdown_duration=shutdown_duration,
target=DpsTarget(
power_target=DpsPowerTarget(
power_step=Power(power_step),
min_power_target=Power(min_power_target),
)
),
),
)
async def set_performance_mode(
self,
wattage_target: int = None,
hashrate_target: int = None,
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
if wattage_target is not None and hashrate_target is not None:
logging.error(
"Cannot use both wattage_target and hashrate_target, using wattage_target."
)
elif wattage_target is None and hashrate_target is None:
raise APIError(
"No target supplied, please supply either wattage_target or hashrate_target."
)
if wattage_target is not None:
return await self.send_command(
"set_performance_mode",
message=SetPerformanceModeRequest(
save_action=save_action,
mode=PerformanceMode(
tuner_mode=TunerPerformanceMode(
power_target=PowerTargetMode(
power_target=Power(watt=wattage_target)
)
)
),
),
)
if hashrate_target is not None:
return await self.send_command(
"set_performance_mode",
message=SetPerformanceModeRequest(
save_action=save_action,
mode=PerformanceMode(
tuner_mode=TunerPerformanceMode(
hashrate_target=HashrateTargetMode(
hashrate_target=TeraHashrate(
terahash_per_second=hashrate_target
)
)
)
),
),
) )
async def get_active_performance_mode(self): async def get_active_performance_mode(self):

View File

@@ -0,0 +1,84 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
import httpx
from pyasic import settings
from pyasic.errors import APIError
class BOSMinerLuCIAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
self.port = 80
async def multicommand(self, *commands: str) -> dict:
data = {}
for command in commands:
data[command] = await self.send_command(command, ignore_errors=True)
return data
async def send_command(self, path: str, ignore_errors: bool = False) -> dict:
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
await self.auth(client)
data = await client.get(
f"http://{self.ip}:{self.port}/cgi-bin/luci/{path}",
headers={"User-Agent": "BTC Tools v0.1"},
)
if data.status_code == 200:
return data.json()
if ignore_errors:
return {}
raise APIError(
f"LUCI web command failed: path={path}, code={data.status_code}"
)
except (httpx.HTTPError, json.JSONDecodeError):
if ignore_errors:
return {}
raise APIError(f"LUCI web command failed: path={path}")
async def auth(self, session: httpx.AsyncClient):
login = {"luci_username": self.username, "luci_password": self.pwd}
url = f"http://{self.ip}:{self.port}/cgi-bin/luci"
headers = {
"User-Agent": "BTC Tools v0.1", # only seems to respond if this user-agent is set
"Content-Type": "application/x-www-form-urlencoded",
}
await session.post(url, headers=headers, data=login)
async def get_net_conf(self):
return await self.send_command("admin/network/iface_status/lan")
async def get_cfg_metadata(self):
return await self.send_command("admin/miner/cfg_metadata")
async def get_cfg_data(self):
return await self.send_command("admin/miner/cfg_data")
async def get_bos_info(self):
return await self.send_command("bos/info")
async def get_overview(self):
return await self.send_command(
"admin/status/overview?status=1"
) # needs status=1 or it fails
async def get_api_status(self):
return await self.send_command("admin/miner/api_status")

View File

@@ -29,6 +29,7 @@ class ePICWebAPI(BaseWebAPI):
self.username = "root" self.username = "root"
self.pwd = settings.get("default_epic_password", "letmein") self.pwd = settings.get("default_epic_password", "letmein")
self.token = None self.token = None
self.port = 4028
async def send_command( async def send_command(
self, self,
@@ -50,13 +51,13 @@ class ePICWebAPI(BaseWebAPI):
"password": self.pwd, "password": self.pwd,
} }
response = await client.post( response = await client.post(
f"http://{self.ip}:4028/{command}", f"http://{self.ip}:{self.port}/{command}",
timeout=5, timeout=5,
json=epic_param, json=epic_param,
) )
else: else:
response = await client.get( response = await client.get(
f"http://{self.ip}:4028/{command}", f"http://{self.ip}:{self.port}/{command}",
timeout=5, timeout=5,
) )
if not response.status_code == 200: if not response.status_code == 200:
@@ -88,10 +89,10 @@ class ePICWebAPI(BaseWebAPI):
return data return data
async def restart_epic(self) -> dict: async def restart_epic(self) -> dict:
return await self.send_command("softreboot", post=True, parameters=None) return await self.send_command("softreboot", post=True)
async def reboot(self) -> dict: async def reboot(self) -> dict:
return await self.send_command("reboot", post=True, parameters=None) return await self.send_command("reboot", post=True)
async def pause_mining(self) -> dict: async def pause_mining(self) -> dict:
return await self.send_command("miner", post=True, parameters="Stop") return await self.send_command("miner", post=True, parameters="Stop")

View File

@@ -33,10 +33,10 @@ class GoldshellWebAPI(BaseWebAPI):
async def auth(self): async def auth(self):
async with httpx.AsyncClient(transport=settings.transport()) as client: async with httpx.AsyncClient(transport=settings.transport()) as client:
try: try:
await client.get(f"http://{self.ip}/user/logout") await client.get(f"http://{self.ip}:{self.port}/user/logout")
auth = ( auth = (
await client.get( await client.get(
f"http://{self.ip}/user/login?username={self.username}&password={self.pwd}&cipher=false" f"http://{self.ip}:{self.port}/user/login?username={self.username}&password={self.pwd}&cipher=false"
) )
).json() ).json()
except httpx.HTTPError: except httpx.HTTPError:
@@ -46,7 +46,7 @@ class GoldshellWebAPI(BaseWebAPI):
try: try:
auth = ( auth = (
await client.get( await client.get(
f"http://{self.ip}/user/login?username=admin&password=bbad7537f4c8b6ea31eea0b3d760e257&cipher=true" f"http://{self.ip}:{self.port}/user/login?username=admin&password=bbad7537f4c8b6ea31eea0b3d760e257&cipher=true"
) )
).json() ).json()
except (httpx.HTTPError, json.JSONDecodeError): except (httpx.HTTPError, json.JSONDecodeError):
@@ -76,14 +76,14 @@ class GoldshellWebAPI(BaseWebAPI):
try: try:
if parameters: if parameters:
response = await client.put( response = await client.put(
f"http://{self.ip}/mcb/{command}", f"http://{self.ip}:{self.port}/mcb/{command}",
headers={"Authorization": "Bearer " + self.jwt}, headers={"Authorization": "Bearer " + self.jwt},
timeout=settings.get("api_function_timeout", 5), timeout=settings.get("api_function_timeout", 5),
json=parameters, json=parameters,
) )
else: else:
response = await client.get( response = await client.get(
f"http://{self.ip}/mcb/{command}", f"http://{self.ip}:{self.port}/mcb/{command}",
headers={"Authorization": "Bearer " + self.jwt}, headers={"Authorization": "Bearer " + self.jwt},
timeout=settings.get("api_function_timeout", 5), timeout=settings.get("api_function_timeout", 5),
) )
@@ -106,7 +106,7 @@ class GoldshellWebAPI(BaseWebAPI):
for command in commands: for command in commands:
try: try:
response = await client.get( response = await client.get(
f"http://{self.ip}/mcb/{command}", f"http://{self.ip}:{self.port}/mcb/{command}",
headers={"Authorization": "Bearer " + self.jwt}, headers={"Authorization": "Bearer " + self.jwt},
timeout=settings.get("api_function_timeout", 5), timeout=settings.get("api_function_timeout", 5),
) )

View File

@@ -35,7 +35,7 @@ class InnosiliconWebAPI(BaseWebAPI):
async with httpx.AsyncClient(transport=settings.transport()) as client: async with httpx.AsyncClient(transport=settings.transport()) as client:
try: try:
auth = await client.post( auth = await client.post(
f"http://{self.ip}/api/auth", f"http://{self.ip}:{self.port}/api/auth",
data={"username": self.username, "password": self.pwd}, data={"username": self.username, "password": self.pwd},
) )
except httpx.HTTPError: except httpx.HTTPError:
@@ -58,7 +58,7 @@ class InnosiliconWebAPI(BaseWebAPI):
for i in range(settings.get("get_data_retries", 1)): for i in range(settings.get("get_data_retries", 1)):
try: try:
response = await client.post( response = await client.post(
f"http://{self.ip}/api/{command}", f"http://{self.ip}:{self.port}/api/{command}",
headers={"Authorization": "Bearer " + self.jwt}, headers={"Authorization": "Bearer " + self.jwt},
timeout=settings.get("api_function_timeout", 5), timeout=settings.get("api_function_timeout", 5),
json=parameters, json=parameters,
@@ -94,7 +94,7 @@ class InnosiliconWebAPI(BaseWebAPI):
for command in commands: for command in commands:
try: try:
response = await client.post( response = await client.post(
f"http://{self.ip}/api/{command}", f"http://{self.ip}:{self.port}/api/{command}",
headers={"Authorization": "Bearer " + self.jwt}, headers={"Authorization": "Bearer " + self.jwt},
timeout=settings.get("api_function_timeout", 5), timeout=settings.get("api_function_timeout", 5),
) )

View File

@@ -34,7 +34,7 @@ class VNishWebAPI(BaseWebAPI):
async with httpx.AsyncClient(transport=settings.transport()) as client: async with httpx.AsyncClient(transport=settings.transport()) as client:
try: try:
auth = await client.post( auth = await client.post(
f"http://{self.ip}/api/v1/unlock", f"http://{self.ip}:{self.port}/api/v1/unlock",
json={"pw": self.pwd}, json={"pw": self.pwd},
) )
except httpx.HTTPError: except httpx.HTTPError:
@@ -68,21 +68,21 @@ class VNishWebAPI(BaseWebAPI):
if parameters.get("post"): if parameters.get("post"):
parameters.pop("post") parameters.pop("post")
response = await client.post( response = await client.post(
f"http://{self.ip}/api/v1/{command}", f"http://{self.ip}:{self.port}/api/v1/{command}",
headers={"Authorization": auth}, headers={"Authorization": auth},
timeout=settings.get("api_function_timeout", 5), timeout=settings.get("api_function_timeout", 5),
json=parameters, json=parameters,
) )
elif not parameters == {}: elif not parameters == {}:
response = await client.post( response = await client.post(
f"http://{self.ip}/api/v1/{command}", f"http://{self.ip}:{self.port}/api/v1/{command}",
headers={"Authorization": auth}, headers={"Authorization": auth},
timeout=settings.get("api_function_timeout", 5), timeout=settings.get("api_function_timeout", 5),
json=parameters, json=parameters,
) )
else: else:
response = await client.get( response = await client.get(
f"http://{self.ip}/api/v1/{command}", f"http://{self.ip}:{self.port}/api/v1/{command}",
headers={"Authorization": auth}, headers={"Authorization": auth},
timeout=settings.get("api_function_timeout", 5), timeout=settings.get("api_function_timeout", 5),
) )
@@ -145,3 +145,6 @@ class VNishWebAPI(BaseWebAPI):
async def settings(self): async def settings(self):
return await self.send_command("settings") return await self.send_command("settings")
async def autotune_presets(self):
return await self.send_command("autotune/presets")

View File

@@ -14,8 +14,6 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
import unittest
from tests.api_tests import * from tests.api_tests import *
from tests.config_tests import TestConfig from tests.config_tests import TestConfig
from tests.miners_tests import MinersTest from tests.miners_tests import MinersTest

View File

@@ -18,23 +18,22 @@ import unittest
import warnings import warnings
from dataclasses import asdict from dataclasses import asdict
from pyasic.miners.backends import CGMiner # noqa
from pyasic.miners.miner_factory import MINER_CLASSES from pyasic.miners.miner_factory import MINER_CLASSES
class MinersTest(unittest.TestCase): class MinersTest(unittest.TestCase):
def test_miner_model_creation(self): def test_miner_type_creation(self):
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
for miner_model in MINER_CLASSES.keys(): for miner_type in MINER_CLASSES.keys():
for miner_api in MINER_CLASSES[miner_model].keys(): for miner_model in MINER_CLASSES[miner_type].keys():
with self.subTest( with self.subTest(
msg=f"Test creation of miner", msg=f"Test creation of miner",
miner_type=miner_type,
miner_model=miner_model, miner_model=miner_model,
miner_api=miner_api,
): ):
miner = MINER_CLASSES[miner_model][miner_api]("127.0.0.1") miner = MINER_CLASSES[miner_type][miner_model]("127.0.0.1")
self.assertTrue( self.assertTrue(
isinstance(miner, MINER_CLASSES[miner_model][miner_api]) isinstance(miner, MINER_CLASSES[miner_type][miner_model])
) )
def test_miner_data_map_keys(self): def test_miner_data_map_keys(self):
@@ -60,14 +59,14 @@ class MinersTest(unittest.TestCase):
] ]
) )
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
for miner_model in MINER_CLASSES.keys(): for miner_type in MINER_CLASSES.keys():
for miner_api in MINER_CLASSES[miner_model].keys(): for miner_model in MINER_CLASSES[miner_type].keys():
with self.subTest( with self.subTest(
msg=f"Data map key check", msg=f"Data map key check",
miner_type=miner_type,
miner_model=miner_model, miner_model=miner_model,
miner_api=miner_api,
): ):
miner = MINER_CLASSES[miner_model][miner_api]("127.0.0.1") miner = MINER_CLASSES[miner_type][miner_model]("127.0.0.1")
miner_keys = sorted( miner_keys = sorted(
[str(k) for k in asdict(miner.data_locations).keys()] [str(k) for k in asdict(miner.data_locations).keys()]
) )
@@ -75,14 +74,14 @@ class MinersTest(unittest.TestCase):
def test_data_locations_match_signatures_command(self): def test_data_locations_match_signatures_command(self):
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
for miner_model in MINER_CLASSES.keys(): for miner_type in MINER_CLASSES.keys():
for miner_api in MINER_CLASSES[miner_model].keys(): for miner_model in MINER_CLASSES[miner_type].keys():
miner = MINER_CLASSES[miner_model][miner_api]("127.0.0.1") miner = MINER_CLASSES[miner_type][miner_model]("127.0.0.1")
for data_point in asdict(miner.data_locations).values(): for data_point in asdict(miner.data_locations).values():
with self.subTest( with self.subTest(
msg=f"Test {data_point['cmd']} signature matches", msg=f"Test {data_point['cmd']} signature matches",
miner_type=miner_type,
miner_model=miner_model, miner_model=miner_model,
miner_api=miner_api,
): ):
func = getattr(miner, data_point["cmd"]) func = getattr(miner, data_point["cmd"])
signature = inspect.signature(func) signature = inspect.signature(func)
@@ -98,23 +97,6 @@ class MinersTest(unittest.TestCase):
set([k["name"] for k in data_point["kwargs"]]), set([k["name"] for k in data_point["kwargs"]]),
) )
def test_data_locations_use_private_funcs(self):
warnings.filterwarnings("ignore")
for miner_model in MINER_CLASSES.keys():
for miner_api in MINER_CLASSES[miner_model].keys():
miner = MINER_CLASSES[miner_model][miner_api]("127.0.0.1")
for data_point in asdict(miner.data_locations).values():
with self.subTest(
msg=f"Test {data_point['cmd']} is private",
miner_model=miner_model,
miner_api=miner_api,
):
self.assertTrue(
data_point["cmd"].startswith("_")
or data_point["cmd"] == "get_config"
)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()