refactor: rename miner.api to miner.rpc. Add miner.api property linked to miner.rpc.

This commit is contained in:
UpstreamData
2024-01-26 10:15:20 -07:00
parent c2b6cc7468
commit 96aa346f00
15 changed files with 672 additions and 659 deletions

View File

@@ -28,39 +28,39 @@ HIVEON_T9_DATA_LOC = DataLocations(
**{
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
}
)
@@ -84,15 +84,15 @@ class HiveonT9(Hiveon, T9):
except (TypeError, ValueError, asyncssh.Error, OSError, AttributeError):
pass
async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
hashboards = [
HashBoard(slot=board, expected_chips=self.expected_chips)
for board in range(self.expected_hashboards)
]
if api_stats is None:
if rpc_stats is None:
try:
api_stats = self.api.stats()
rpc_stats = self.rpc.stats()
except APIError:
return []
@@ -108,8 +108,8 @@ class HiveonT9(Hiveon, T9):
for chipset in board_map[board]:
if hashboards[board].chip_temp is None:
try:
hashboards[board].temp = api_stats["STATS"][1][f"temp{chipset}"]
hashboards[board].chip_temp = api_stats["STATS"][1][
hashboards[board].temp = rpc_stats["STATS"][1][f"temp{chipset}"]
hashboards[board].chip_temp = rpc_stats["STATS"][1][
f"temp2_{chipset}"
]
except (KeyError, IndexError):
@@ -117,8 +117,8 @@ class HiveonT9(Hiveon, T9):
else:
hashboards[board].missing = False
try:
hashrate += api_stats["STATS"][1][f"chain_rate{chipset}"]
chips += api_stats["STATS"][1][f"chain_acn{chipset}"]
hashrate += rpc_stats["STATS"][1][f"chain_rate{chipset}"]
chips += rpc_stats["STATS"][1][f"chain_acn{chipset}"]
except (KeyError, IndexError):
pass
hashboards[board].hashrate = round(hashrate / 1000, 2)
@@ -126,15 +126,15 @@ class HiveonT9(Hiveon, T9):
return hashboards
async def _get_wattage(self, api_stats: dict = None) -> Optional[int]:
if not api_stats:
async def _get_wattage(self, rpc_stats: dict = None) -> Optional[int]:
if not rpc_stats:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats:
boards = api_stats.get("STATS")
if rpc_stats:
boards = rpc_stats.get("STATS")
try:
wattage_raw = boards[1]["chain_power"]
except (KeyError, IndexError):
@@ -143,23 +143,23 @@ class HiveonT9(Hiveon, T9):
# parse wattage position out of raw data
return round(float(wattage_raw.split(" ")[0]))
async def _get_env_temp(self, api_stats: dict = None) -> Optional[float]:
async def _get_env_temp(self, rpc_stats: dict = None) -> Optional[float]:
env_temp_list = []
board_map = {
0: [2, 9, 10],
1: [3, 11, 12],
2: [4, 13, 14],
}
if not api_stats:
if not rpc_stats:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats:
if rpc_stats:
for board in board_map.values():
for chipset in board:
try:
env_temp = api_stats["STATS"][1][f"temp3_{chipset}"]
env_temp = rpc_stats["STATS"][1][f"temp3_{chipset}"]
if not env_temp == 0:
env_temp_list.append(int(env_temp))
except (KeyError, IndexError):

View File

@@ -40,11 +40,11 @@ ANTMINER_MODERN_DATA_LOC = DataLocations(
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname",
@@ -52,15 +52,15 @@ ANTMINER_MODERN_DATA_LOC = DataLocations(
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.ERRORS): DataFunction(
"_get_errors",
@@ -76,7 +76,7 @@ ANTMINER_MODERN_DATA_LOC = DataLocations(
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
}
)
@@ -206,13 +206,13 @@ class AntminerModern(BMMiner):
]
try:
api_stats = await self.api.send_command("stats", new_api=True)
rpc_stats = await self.rpc.send_command("stats", new_rpc=True)
except APIError:
return hashboards
if api_stats is not None:
if rpc_stats is not None:
try:
for board in api_stats["STATS"][0]["chain"]:
for board in rpc_stats["STATS"][0]["chain"]:
hashboards[board["index"]].hashrate = round(
board["rate_real"] / 1000, 2
)
@@ -254,18 +254,18 @@ class AntminerModern(BMMiner):
pass
return self.light
async def _get_expected_hashrate(self, api_stats: dict = None) -> Optional[float]:
if api_stats is None:
async def _get_expected_hashrate(self, rpc_stats: dict = None) -> Optional[float]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
expected_rate = api_stats["STATS"][1]["total_rateideal"]
expected_rate = rpc_stats["STATS"][1]["total_rateideal"]
try:
rate_unit = api_stats["STATS"][1]["rate_unit"]
rate_unit = rpc_stats["STATS"][1]["rate_unit"]
except KeyError:
rate_unit = "GH"
if rate_unit == "GH":
@@ -336,16 +336,16 @@ class AntminerModern(BMMiner):
except LookupError:
pass
async def _get_uptime(self, api_stats: dict = None) -> Optional[int]:
if api_stats is None:
async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
return int(api_stats["STATS"][1]["Elapsed"])
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass
@@ -354,11 +354,11 @@ ANTMINER_OLD_DATA_LOC = DataLocations(
**{
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname",
@@ -366,15 +366,15 @@ ANTMINER_OLD_DATA_LOC = DataLocations(
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
@@ -386,7 +386,7 @@ ANTMINER_OLD_DATA_LOC = DataLocations(
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
}
)
@@ -479,47 +479,47 @@ class AntminerOld(CGMiner):
except KeyError:
pass
async def _get_fans(self, api_stats: dict = None) -> List[Fan]:
if api_stats is None:
async def _get_fans(self, rpc_stats: dict = None) -> List[Fan]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
fans_data = [Fan() for _ in range(self.expected_fans)]
if api_stats is not None:
if rpc_stats is not None:
try:
fan_offset = -1
for fan_num in range(1, 8, 4):
for _f_num in range(4):
f = api_stats["STATS"][1].get(f"fan{fan_num + _f_num}")
f = rpc_stats["STATS"][1].get(f"fan{fan_num + _f_num}")
if f and not f == 0 and fan_offset == -1:
fan_offset = fan_num + 2
if fan_offset == -1:
fan_offset = 3
for fan in range(self.expected_fans):
fans_data[fan].speed = api_stats["STATS"][1].get(
fans_data[fan].speed = rpc_stats["STATS"][1].get(
f"fan{fan_offset+fan}", 0
)
except LookupError:
pass
return fans_data
async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
hashboards = []
if api_stats is None:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
board_offset = -1
boards = api_stats["STATS"]
boards = rpc_stats["STATS"]
if len(boards) > 1:
for board_num in range(1, 16, 5):
for _b_num in range(5):
@@ -574,27 +574,27 @@ class AntminerOld(CGMiner):
except LookupError:
pass
api_summary = None
rpc_summary = None
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if not api_summary == {}:
if rpc_summary is not None:
if not rpc_summary == {}:
return True
else:
return False
async def _get_uptime(self, api_stats: dict = None) -> Optional[int]:
if api_stats is None:
async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
return int(api_stats["STATS"][1]["Elapsed"])
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass

View File

@@ -47,12 +47,12 @@ AURADINE_DATA_LOC = DataLocations(
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[
RPCAPICommand("api_devs", "devs"),
RPCAPICommand("rpc_devs", "devs"),
WebAPICommand("web_ipreport", "ipreport"),
],
),
@@ -78,7 +78,7 @@ AURADINE_DATA_LOC = DataLocations(
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
}
)
@@ -116,8 +116,8 @@ class AuradineLEDCodes(Enum):
class Auradine(BaseMiner):
"""Base handler for Auradine miners"""
_api_cls = GCMinerRPCAPI
api: GCMinerRPCAPI
_rpc_cls = GCMinerRPCAPI
rpc: GCMinerRPCAPI
_web_cls = AuradineWebAPI
web: AuradineWebAPI
@@ -127,10 +127,18 @@ class Auradine(BaseMiner):
supports_autotuning = True
async def fault_light_on(self) -> bool:
return await self.web.set_led(code=int(AuradineLEDCodes.LOCATE_MINER))
try:
await self.web.set_led(code=int(AuradineLEDCodes.LOCATE_MINER))
return True
except APIError:
return False
async def fault_light_off(self) -> bool:
return await self.web.set_led(code=int(AuradineLEDCodes.NORMAL))
try:
await self.web.set_led(code=int(AuradineLEDCodes.NORMAL))
return True
except APIError:
return False
async def reboot(self) -> bool:
try:
@@ -227,32 +235,32 @@ class Auradine(BaseMiner):
except LookupError:
pass
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]:
if api_summary is None:
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return round(
float(float(api_summary["SUMMARY"][0]["MHS 5s"]) / 1000000), 2
float(float(rpc_summary["SUMMARY"][0]["MHS 5s"]) / 1000000), 2
)
except (LookupError, ValueError, TypeError):
pass
async def _get_hashboards(
self, api_devs: dict = None, web_ipreport: dict = None
self, rpc_devs: dict = None, web_ipreport: dict = None
) -> List[HashBoard]:
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if api_devs is None:
if rpc_devs is None:
try:
api_devs = await self.api.devs()
rpc_devs = await self.rpc.devs()
except APIError:
pass
if web_ipreport is None:
@@ -261,9 +269,9 @@ class Auradine(BaseMiner):
except APIError:
pass
if api_devs is not None:
if rpc_devs is not None:
try:
for board in api_devs["DEVS"]:
for board in rpc_devs["DEVS"]:
b_id = board["ID"] - 1
hashboards[b_id].hashrate = round(
float(float(board["MHS 5s"]) / 1000000), 2
@@ -365,15 +373,15 @@ class Auradine(BaseMiner):
except (LookupError, TypeError, ValueError):
pass
async def _get_uptime(self, api_summary: dict = None) -> Optional[int]:
if api_summary is None:
async def _get_uptime(self, rpc_summary: dict = None) -> Optional[int]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return api_summary["SUMMARY"][0]["Elapsed"]
return rpc_summary["SUMMARY"][0]["Elapsed"]
except LookupError:
pass

View File

@@ -26,47 +26,47 @@ AVALON_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_devs", "devs")],
[RPCAPICommand("rpc_devs", "devs")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
}
)
@@ -79,7 +79,7 @@ class AvalonMiner(CGMiner):
async def fault_light_on(self) -> bool:
try:
data = await self.api.ascset(0, "led", "1-1")
data = await self.rpc.ascset(0, "led", "1-1")
except APIError:
return False
if data["STATUS"][0]["Msg"] == "ASC 0 set OK":
@@ -88,7 +88,7 @@ class AvalonMiner(CGMiner):
async def fault_light_off(self) -> bool:
try:
data = await self.api.ascset(0, "led", "1-0")
data = await self.rpc.ascset(0, "led", "1-0")
except APIError:
return False
if data["STATUS"][0]["Msg"] == "ASC 0 set OK":
@@ -97,7 +97,7 @@ class AvalonMiner(CGMiner):
async def reboot(self) -> bool:
try:
data = await self.api.restart()
data = await self.rpc.restart()
except APIError:
return False
@@ -155,16 +155,16 @@ class AvalonMiner(CGMiner):
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_mac(self, api_version: dict = None) -> Optional[str]:
if api_version is None:
async def _get_mac(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
api_version = await self.api.version()
rpc_version = await self.rpc.version()
except APIError:
pass
if api_version is not None:
if rpc_version is not None:
try:
base_mac = api_version["VERSION"][0]["MAC"]
base_mac = rpc_version["VERSION"][0]["MAC"]
base_mac = base_mac.upper()
mac = ":".join(
[base_mac[i : (i + 2)] for i in range(0, len(base_mac), 2)]
@@ -173,34 +173,34 @@ class AvalonMiner(CGMiner):
except (KeyError, ValueError):
pass
async def _get_hashrate(self, api_devs: dict = None) -> Optional[float]:
if api_devs is None:
async def _get_hashrate(self, rpc_devs: dict = None) -> Optional[float]:
if rpc_devs is None:
try:
api_devs = await self.api.devs()
rpc_devs = await self.rpc.devs()
except APIError:
pass
if api_devs is not None:
if rpc_devs is not None:
try:
return round(float(api_devs["DEVS"][0]["MHS 1m"] / 1000000), 2)
return round(float(rpc_devs["DEVS"][0]["MHS 1m"] / 1000000), 2)
except (KeyError, IndexError, ValueError, TypeError):
pass
async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if api_stats is None:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
except (IndexError, KeyError, ValueError, TypeError):
return hashboards
@@ -234,62 +234,62 @@ class AvalonMiner(CGMiner):
return hashboards
async def _get_expected_hashrate(self, api_stats: dict = None) -> Optional[float]:
if api_stats is None:
async def _get_expected_hashrate(self, rpc_stats: dict = None) -> Optional[float]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return round(float(parsed_stats["GHSmm"]) / 1000, 2)
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_env_temp(self, api_stats: dict = None) -> Optional[float]:
if api_stats is None:
async def _get_env_temp(self, rpc_stats: dict = None) -> Optional[float]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return float(parsed_stats["Temp"])
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_wattage_limit(self, api_stats: dict = None) -> Optional[int]:
if api_stats is None:
async def _get_wattage_limit(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return int(parsed_stats["MPO"])
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_fans(self, api_stats: dict = None) -> List[Fan]:
if api_stats is None:
async def _get_fans(self, rpc_stats: dict = None) -> List[Fan]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
fans_data = [Fan() for _ in range(self.expected_fans)]
if api_stats is not None:
if rpc_stats is not None:
try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
except LookupError:
return fans_data
@@ -301,18 +301,18 @@ class AvalonMiner(CGMiner):
pass
return fans_data
async def _get_fault_light(self, api_stats: dict = None) -> Optional[bool]:
async def _get_fault_light(self, rpc_stats: dict = None) -> Optional[bool]:
if self.light:
return self.light
if api_stats is None:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
led = int(parsed_stats["Led"])
return True if led == 1 else False
@@ -320,7 +320,7 @@ class AvalonMiner(CGMiner):
pass
try:
data = await self.api.ascset(0, "led", "1-255")
data = await self.rpc.ascset(0, "led", "1-255")
except APIError:
return False
try:

View File

@@ -27,27 +27,27 @@ BFGMINER_DATA_LOC = DataLocations(
**{
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
}
)
@@ -56,82 +56,82 @@ BFGMINER_DATA_LOC = DataLocations(
class BFGMiner(BaseMiner):
"""Base handler for BFGMiner based miners."""
_api_cls = BFGMinerRPCAPI
api: BFGMinerRPCAPI
_rpc_cls = BFGMinerRPCAPI
rpc: BFGMinerRPCAPI
data_locations = BFGMINER_DATA_LOC
async def get_config(self) -> MinerConfig:
# get pool data
try:
pools = await self.api.pools()
pools = await self.rpc.pools()
except APIError:
return self.config
self.config = MinerConfig.from_api(pools)
self.config = MinerConfig.from_rpc(pools)
return self.config
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_api_ver(self, api_version: dict = None) -> Optional[str]:
if api_version is None:
async def _get_api_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
api_version = await self.api.version()
rpc_version = await self.rpc.version()
except APIError:
pass
if api_version is not None:
if rpc_version is not None:
try:
self.api_ver = api_version["VERSION"][0]["API"]
self.rpc_ver = rpc_version["VERSION"][0]["API"]
except LookupError:
pass
return self.api_ver
return self.rpc_ver
async def _get_fw_ver(self, api_version: dict = None) -> Optional[str]:
if api_version is None:
async def _get_fw_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
api_version = await self.api.version()
rpc_version = await self.rpc.version()
except APIError:
pass
if api_version is not None:
if rpc_version is not None:
try:
self.fw_ver = api_version["VERSION"][0]["CompileTime"]
self.fw_ver = rpc_version["VERSION"][0]["CompileTime"]
except LookupError:
pass
return self.fw_ver
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]:
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
# get hr from API
if api_summary is None:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 20s"] / 1000000), 2)
return round(float(rpc_summary["SUMMARY"][0]["MHS 20s"] / 1000000), 2)
except (LookupError, ValueError, TypeError):
pass
async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
hashboards = []
if api_stats is None:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
board_offset = -1
boards = api_stats["STATS"]
boards = rpc_stats["STATS"]
if len(boards) > 1:
for board_num in range(1, 16, 5):
for _b_num in range(5):
@@ -173,28 +173,28 @@ class BFGMiner(BaseMiner):
return hashboards
async def _get_fans(self, api_stats: dict = None) -> List[Fan]:
if api_stats is None:
async def _get_fans(self, rpc_stats: dict = None) -> List[Fan]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
fans_data = [None, None, None, None]
if api_stats is not None:
if rpc_stats is not None:
try:
fan_offset = -1
for fan_num in range(0, 8, 4):
for _f_num in range(4):
f = api_stats["STATS"][1].get(f"fan{fan_num + _f_num}", 0)
f = rpc_stats["STATS"][1].get(f"fan{fan_num + _f_num}", 0)
if not f == 0 and fan_offset == -1:
fan_offset = fan_num
if fan_offset == -1:
fan_offset = 1
for fan in range(self.expected_fans):
fans_data[fan] = api_stats["STATS"][1].get(
fans_data[fan] = rpc_stats["STATS"][1].get(
f"fan{fan_offset+fan}", 0
)
except LookupError:
@@ -203,19 +203,19 @@ class BFGMiner(BaseMiner):
return fans
async def _get_expected_hashrate(self, api_stats: dict = None) -> Optional[float]:
async def _get_expected_hashrate(self, rpc_stats: dict = None) -> Optional[float]:
# X19 method, not sure compatibility
if api_stats is None:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
expected_rate = api_stats["STATS"][1]["total_rateideal"]
expected_rate = rpc_stats["STATS"][1]["total_rateideal"]
try:
rate_unit = api_stats["STATS"][1]["rate_unit"]
rate_unit = rpc_stats["STATS"][1]["rate_unit"]
except KeyError:
rate_unit = "GH"
if rate_unit == "GH":

View File

@@ -27,31 +27,31 @@ BMMINER_DATA_LOC = DataLocations(
**{
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
}
)
@@ -60,82 +60,82 @@ BMMINER_DATA_LOC = DataLocations(
class BMMiner(BaseMiner):
"""Base handler for BMMiner based miners."""
_api_cls = BMMinerRPCAPI
api: BMMinerRPCAPI
_rpc_cls = BMMinerRPCAPI
rpc: BMMinerRPCAPI
data_locations = BMMINER_DATA_LOC
async def get_config(self) -> MinerConfig:
# get pool data
try:
pools = await self.api.pools()
pools = await self.rpc.pools()
except APIError:
return self.config
self.config = MinerConfig.from_api(pools)
self.config = MinerConfig.from_rpc(pools)
return self.config
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_api_ver(self, api_version: dict = None) -> Optional[str]:
if api_version is None:
async def _get_api_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
api_version = await self.api.version()
rpc_version = await self.rpc.version()
except APIError:
pass
if api_version is not None:
if rpc_version is not None:
try:
self.api_ver = api_version["VERSION"][0]["API"]
self.rpc_ver = rpc_version["VERSION"][0]["API"]
except LookupError:
pass
return self.api_ver
return self.rpc_ver
async def _get_fw_ver(self, api_version: dict = None) -> Optional[str]:
if api_version is None:
async def _get_fw_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
api_version = await self.api.version()
rpc_version = await self.rpc.version()
except APIError:
pass
if api_version is not None:
if rpc_version is not None:
try:
self.fw_ver = api_version["VERSION"][0]["CompileTime"]
self.fw_ver = rpc_version["VERSION"][0]["CompileTime"]
except LookupError:
pass
return self.fw_ver
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]:
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
# get hr from API
if api_summary is None:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return round(float(api_summary["SUMMARY"][0]["GHS 5s"] / 1000), 2)
return round(float(rpc_summary["SUMMARY"][0]["GHS 5s"] / 1000), 2)
except (LookupError, ValueError, TypeError):
pass
async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
hashboards = []
if api_stats is None:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
board_offset = -1
boards = api_stats["STATS"]
boards = rpc_stats["STATS"]
if len(boards) > 1:
for board_num in range(1, 16, 5):
for _b_num in range(5):
@@ -190,28 +190,28 @@ class BMMiner(BaseMiner):
return hashboards
async def _get_fans(self, api_stats: dict = None) -> List[Fan]:
if api_stats is None:
async def _get_fans(self, rpc_stats: dict = None) -> List[Fan]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
fans = [Fan() for _ in range(self.expected_fans)]
if api_stats is not None:
if rpc_stats is not None:
try:
fan_offset = -1
for fan_num in range(1, 8, 4):
for _f_num in range(4):
f = api_stats["STATS"][1].get(f"fan{fan_num + _f_num}", 0)
f = rpc_stats["STATS"][1].get(f"fan{fan_num + _f_num}", 0)
if f and not f == 0 and fan_offset == -1:
fan_offset = fan_num
if fan_offset == -1:
fan_offset = 1
for fan in range(self.expected_fans):
fans[fan].speed = api_stats["STATS"][1].get(
fans[fan].speed = rpc_stats["STATS"][1].get(
f"fan{fan_offset+fan}", 0
)
except LookupError:
@@ -219,19 +219,19 @@ class BMMiner(BaseMiner):
return fans
async def _get_expected_hashrate(self, api_stats: dict = None) -> Optional[float]:
async def _get_expected_hashrate(self, rpc_stats: dict = None) -> Optional[float]:
# X19 method, not sure compatibility
if api_stats is None:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
expected_rate = api_stats["STATS"][1]["total_rateideal"]
expected_rate = rpc_stats["STATS"][1]["total_rateideal"]
try:
rate_unit = api_stats["STATS"][1]["rate_unit"]
rate_unit = rpc_stats["STATS"][1]["rate_unit"]
except KeyError:
rate_unit = "GH"
if rate_unit == "GH":
@@ -243,15 +243,15 @@ class BMMiner(BaseMiner):
except LookupError:
pass
async def _get_uptime(self, api_stats: dict = None) -> Optional[int]:
if api_stats is None:
async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
return int(api_stats["STATS"][1]["Elapsed"])
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass

View File

@@ -44,7 +44,7 @@ BOSMINER_DATA_LOC = DataLocations(
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
@@ -52,43 +52,43 @@ BOSMINER_DATA_LOC = DataLocations(
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_devs", "devs")],
[RPCAPICommand("rpc_devs", "devs")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[
RPCAPICommand("api_temps", "temps"),
RPCAPICommand("api_devdetails", "devdetails"),
RPCAPICommand("api_devs", "devs"),
RPCAPICommand("rpc_temps", "temps"),
RPCAPICommand("rpc_devdetails", "devdetails"),
RPCAPICommand("rpc_devs", "devs"),
],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[RPCAPICommand("api_tunerstatus", "tunerstatus")],
[RPCAPICommand("rpc_tunerstatus", "tunerstatus")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit",
[RPCAPICommand("api_tunerstatus", "tunerstatus")],
[RPCAPICommand("rpc_tunerstatus", "tunerstatus")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_fans", "fans")],
[RPCAPICommand("rpc_fans", "fans")],
),
str(DataOptions.ERRORS): DataFunction(
"_get_errors",
[RPCAPICommand("api_tunerstatus", "tunerstatus")],
[RPCAPICommand("rpc_tunerstatus", "tunerstatus")],
),
str(DataOptions.IS_MINING): DataFunction(
"_is_mining",
[RPCAPICommand("api_devdetails", "devdetails")],
[RPCAPICommand("rpc_devdetails", "devdetails")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
}
)
@@ -97,8 +97,8 @@ BOSMINER_DATA_LOC = DataLocations(
class BOSMiner(BaseMiner):
"""Handler for old versions of BraiinsOS+ (pre-gRPC)"""
_api_cls = BOSMinerRPCAPI
api: BOSMinerRPCAPI
_rpc_cls = BOSMinerRPCAPI
rpc: BOSMinerRPCAPI
_web_cls = BOSMinerWebAPI
web: BOSMinerWebAPI
_ssh_cls = BOSMinerSSH
@@ -139,7 +139,7 @@ class BOSMiner(BaseMiner):
async def stop_mining(self) -> bool:
try:
data = await self.api.pause()
data = await self.rpc.pause()
except APIError:
return False
@@ -150,7 +150,7 @@ class BOSMiner(BaseMiner):
async def resume_mining(self) -> bool:
try:
data = await self.api.resume()
data = await self.rpc.resume()
except APIError:
return False
@@ -292,23 +292,23 @@ class BOSMiner(BaseMiner):
# if result:
# return result.upper().strip()
async def _get_api_ver(self, api_version: dict = None) -> Optional[str]:
if api_version is None:
async def _get_api_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
api_version = await self.api.version()
rpc_version = await self.rpc.version()
except APIError:
pass
# Now get the API version
if api_version is not None:
if rpc_version is not None:
try:
api_ver = api_version["VERSION"][0]["API"]
rpc_ver = rpc_version["VERSION"][0]["API"]
except LookupError:
api_ver = None
self.api_ver = api_ver
self.api.api_ver = self.api_ver
rpc_ver = None
self.rpc_ver = rpc_ver
self.rpc.rpc_ver = self.rpc_ver
return self.api_ver
return self.rpc_ver
async def _get_fw_ver(self, web_bos_info: dict = None) -> Optional[str]:
if web_bos_info is None:
@@ -340,24 +340,24 @@ class BOSMiner(BaseMiner):
return None
return hostname
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]:
if api_summary is None:
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
return round(float(rpc_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except (KeyError, IndexError, ValueError, TypeError):
pass
async def _get_hashboards(
self,
api_temps: dict = None,
api_devdetails: dict = None,
api_devs: dict = None,
rpc_temps: dict = None,
rpc_devdetails: dict = None,
rpc_devs: dict = None,
) -> List[HashBoard]:
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
@@ -365,34 +365,34 @@ class BOSMiner(BaseMiner):
]
cmds = []
if api_temps is None:
if rpc_temps is None:
cmds.append("temps")
if api_devdetails is None:
if rpc_devdetails is None:
cmds.append("devdetails")
if api_devs is None:
if rpc_devs is None:
cmds.append("devs")
if len(cmds) > 0:
try:
d = await self.api.multicommand(*cmds)
d = await self.rpc.multicommand(*cmds)
except APIError:
d = {}
try:
api_temps = d["temps"][0]
rpc_temps = d["temps"][0]
except LookupError:
api_temps = None
rpc_temps = None
try:
api_devdetails = d["devdetails"][0]
rpc_devdetails = d["devdetails"][0]
except (KeyError, IndexError):
api_devdetails = None
rpc_devdetails = None
try:
api_devs = d["devs"][0]
rpc_devs = d["devs"][0]
except LookupError:
api_devs = None
if api_temps is not None:
rpc_devs = None
if rpc_temps is not None:
try:
offset = 6 if api_temps["TEMPS"][0]["ID"] in [6, 7, 8] else 1
offset = 6 if rpc_temps["TEMPS"][0]["ID"] in [6, 7, 8] else 1
for board in api_temps["TEMPS"]:
for board in rpc_temps["TEMPS"]:
_id = board["ID"] - offset
chip_temp = round(board["Chip"])
board_temp = round(board["Board"])
@@ -401,11 +401,11 @@ class BOSMiner(BaseMiner):
except (IndexError, KeyError, ValueError, TypeError):
pass
if api_devdetails is not None:
if rpc_devdetails is not None:
try:
offset = 6 if api_devdetails["DEVDETAILS"][0]["ID"] in [6, 7, 8] else 1
offset = 6 if rpc_devdetails["DEVDETAILS"][0]["ID"] in [6, 7, 8] else 1
for board in api_devdetails["DEVDETAILS"]:
for board in rpc_devdetails["DEVDETAILS"]:
_id = board["ID"] - offset
chips = board["Chips"]
hashboards[_id].chips = chips
@@ -413,11 +413,11 @@ class BOSMiner(BaseMiner):
except (IndexError, KeyError):
pass
if api_devs is not None:
if rpc_devs is not None:
try:
offset = 6 if api_devs["DEVS"][0]["ID"] in [6, 7, 8] else 1
offset = 6 if rpc_devs["DEVS"][0]["ID"] in [6, 7, 8] else 1
for board in api_devs["DEVS"]:
for board in rpc_devs["DEVS"]:
_id = board["ID"] - offset
hashrate = round(float(board["MHS 1m"] / 1000000), 2)
hashboards[_id].hashrate = hashrate
@@ -426,62 +426,62 @@ class BOSMiner(BaseMiner):
return hashboards
async def _get_wattage(self, api_tunerstatus: dict = None) -> Optional[int]:
if api_tunerstatus is None:
async def _get_wattage(self, rpc_tunerstatus: dict = None) -> Optional[int]:
if rpc_tunerstatus is None:
try:
api_tunerstatus = await self.api.tunerstatus()
rpc_tunerstatus = await self.rpc.tunerstatus()
except APIError:
pass
if api_tunerstatus is not None:
if rpc_tunerstatus is not None:
try:
return api_tunerstatus["TUNERSTATUS"][0][
return rpc_tunerstatus["TUNERSTATUS"][0][
"ApproximateMinerPowerConsumption"
]
except LookupError:
pass
async def _get_wattage_limit(self, api_tunerstatus: dict = None) -> Optional[int]:
if api_tunerstatus is None:
async def _get_wattage_limit(self, rpc_tunerstatus: dict = None) -> Optional[int]:
if rpc_tunerstatus is None:
try:
api_tunerstatus = await self.api.tunerstatus()
rpc_tunerstatus = await self.rpc.tunerstatus()
except APIError:
pass
if api_tunerstatus is not None:
if rpc_tunerstatus is not None:
try:
return api_tunerstatus["TUNERSTATUS"][0]["PowerLimit"]
return rpc_tunerstatus["TUNERSTATUS"][0]["PowerLimit"]
except LookupError:
pass
async def _get_fans(self, api_fans: dict = None) -> List[Fan]:
if api_fans is None:
async def _get_fans(self, rpc_fans: dict = None) -> List[Fan]:
if rpc_fans is None:
try:
api_fans = await self.api.fans()
rpc_fans = await self.rpc.fans()
except APIError:
pass
if api_fans is not None:
if rpc_fans is not None:
fans = []
for n in range(self.expected_fans):
try:
fans.append(Fan(api_fans["FANS"][n]["RPM"]))
fans.append(Fan(rpc_fans["FANS"][n]["RPM"]))
except (IndexError, KeyError):
pass
return fans
return [Fan() for _ in range(self.expected_fans)]
async def _get_errors(self, api_tunerstatus: dict = None) -> List[MinerErrorData]:
if api_tunerstatus is None:
async def _get_errors(self, rpc_tunerstatus: dict = None) -> List[MinerErrorData]:
if rpc_tunerstatus is None:
try:
api_tunerstatus = await self.api.tunerstatus()
rpc_tunerstatus = await self.rpc.tunerstatus()
except APIError:
pass
if api_tunerstatus is not None:
if rpc_tunerstatus is not None:
errors = []
try:
chain_status = api_tunerstatus["TUNERSTATUS"][0]["TunerChainStatus"]
chain_status = rpc_tunerstatus["TUNERSTATUS"][0]["TunerChainStatus"]
if chain_status and len(chain_status) > 0:
offset = (
6 if int(chain_status[0]["HashchainIndex"]) in [6, 7, 8] else 0
@@ -513,18 +513,18 @@ class BOSMiner(BaseMiner):
except (TypeError, AttributeError):
return self.light
async def _get_expected_hashrate(self, api_devs: dict = None) -> Optional[float]:
if api_devs is None:
async def _get_expected_hashrate(self, rpc_devs: dict = None) -> Optional[float]:
if rpc_devs is None:
try:
api_devs = await self.api.devs()
rpc_devs = await self.rpc.devs()
except APIError:
pass
if api_devs is not None:
if rpc_devs is not None:
try:
hr_list = []
for board in api_devs["DEVS"]:
for board in rpc_devs["DEVS"]:
expected_hashrate = round(float(board["Nominal MHS"] / 1000000), 2)
if expected_hashrate:
hr_list.append(expected_hashrate)
@@ -537,31 +537,31 @@ class BOSMiner(BaseMiner):
except (IndexError, KeyError):
pass
async def _is_mining(self, api_devdetails: dict = None) -> Optional[bool]:
if api_devdetails is None:
async def _is_mining(self, rpc_devdetails: dict = None) -> Optional[bool]:
if rpc_devdetails is None:
try:
api_devdetails = await self.api.send_command(
rpc_devdetails = await self.rpc.send_command(
"devdetails", ignore_errors=True, allow_warning=False
)
except APIError:
pass
if api_devdetails is not None:
if rpc_devdetails is not None:
try:
return not api_devdetails["STATUS"][0]["Msg"] == "Unavailable"
return not rpc_devdetails["STATUS"][0]["Msg"] == "Unavailable"
except LookupError:
pass
async def _get_uptime(self, api_summary: dict = None) -> Optional[int]:
if api_summary is None:
async def _get_uptime(self, rpc_summary: dict = None) -> Optional[int]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return int(api_summary["SUMMARY"][0]["Elapsed"])
return int(rpc_summary["SUMMARY"][0]["Elapsed"])
except LookupError:
pass
@@ -574,7 +574,7 @@ BOSER_DATA_LOC = DataLocations(
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
@@ -586,7 +586,7 @@ BOSER_DATA_LOC = DataLocations(
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
@@ -614,7 +614,7 @@ BOSER_DATA_LOC = DataLocations(
),
str(DataOptions.ERRORS): DataFunction(
"_get_errors",
[RPCAPICommand("api_tunerstatus", "tunerstatus")],
[RPCAPICommand("rpc_tunerstatus", "tunerstatus")],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
@@ -622,11 +622,11 @@ BOSER_DATA_LOC = DataLocations(
),
str(DataOptions.IS_MINING): DataFunction(
"_is_mining",
[RPCAPICommand("api_devdetails", "devdetails")],
[RPCAPICommand("rpc_devdetails", "devdetails")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
}
)
@@ -635,7 +635,7 @@ BOSER_DATA_LOC = DataLocations(
class BOSer(BaseMiner):
"""Handler for new versions of BraiinsOS+ (post-gRPC)"""
_api_cls = BOSMinerRPCAPI
_rpc_cls = BOSMinerRPCAPI
web: BOSMinerRPCAPI
_web_cls = BOSerWebAPI
web: BOSerWebAPI
@@ -719,22 +719,22 @@ class BOSer(BaseMiner):
except (LookupError, TypeError):
pass
async def _get_api_ver(self, api_version: dict = None) -> Optional[str]:
if api_version is None:
async def _get_api_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
api_version = await self.api.version()
rpc_version = await self.rpc.version()
except APIError:
pass
if api_version is not None:
if rpc_version is not None:
try:
api_ver = api_version["VERSION"][0]["API"]
rpc_ver = rpc_version["VERSION"][0]["API"]
except LookupError:
api_ver = None
self.api_ver = api_ver
self.api.api_ver = self.api_ver
rpc_ver = None
self.rpc_ver = rpc_ver
self.rpc.rpc_ver = self.rpc_ver
return self.api_ver
return self.rpc_ver
async def _get_fw_ver(self, grpc_miner_details: dict = None) -> Optional[str]:
if grpc_miner_details is None:
@@ -772,16 +772,16 @@ class BOSer(BaseMiner):
except LookupError:
pass
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]:
if api_summary is None:
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
return round(float(rpc_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except (KeyError, IndexError, ValueError, TypeError):
pass
@@ -883,17 +883,17 @@ class BOSer(BaseMiner):
return fans
return [Fan() for _ in range(self.expected_fans)]
async def _get_errors(self, api_tunerstatus: dict = None) -> List[MinerErrorData]:
if api_tunerstatus is None:
async def _get_errors(self, rpc_tunerstatus: dict = None) -> List[MinerErrorData]:
if rpc_tunerstatus is None:
try:
api_tunerstatus = await self.api.tunerstatus()
rpc_tunerstatus = await self.rpc.tunerstatus()
except APIError:
pass
if api_tunerstatus is not None:
if rpc_tunerstatus is not None:
errors = []
try:
chain_status = api_tunerstatus["TUNERSTATUS"][0]["TunerChainStatus"]
chain_status = rpc_tunerstatus["TUNERSTATUS"][0]["TunerChainStatus"]
if chain_status and len(chain_status) > 0:
offset = (
6 if int(chain_status[0]["HashchainIndex"]) in [6, 7, 8] else 0
@@ -931,30 +931,30 @@ class BOSer(BaseMiner):
except LookupError:
pass
async def _is_mining(self, api_devdetails: dict = None) -> Optional[bool]:
if api_devdetails is None:
async def _is_mining(self, rpc_devdetails: dict = None) -> Optional[bool]:
if rpc_devdetails is None:
try:
api_devdetails = await self.api.send_command(
rpc_devdetails = await self.rpc.send_command(
"devdetails", ignore_errors=True, allow_warning=False
)
except APIError:
pass
if api_devdetails is not None:
if rpc_devdetails is not None:
try:
return not api_devdetails["STATUS"][0]["Msg"] == "Unavailable"
return not rpc_devdetails["STATUS"][0]["Msg"] == "Unavailable"
except LookupError:
pass
async def _get_uptime(self, api_summary: dict = None) -> Optional[int]:
if api_summary is None:
async def _get_uptime(self, rpc_summary: dict = None) -> Optional[int]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return int(api_summary["SUMMARY"][0]["Elapsed"])
return int(rpc_summary["SUMMARY"][0]["Elapsed"])
except LookupError:
pass

View File

@@ -30,81 +30,81 @@ BTMINER_DATA_LOC = DataLocations(
str(DataOptions.MAC): DataFunction(
"_get_mac",
[
RPCAPICommand("api_summary", "summary"),
RPCAPICommand("api_get_miner_info", "get_miner_info"),
RPCAPICommand("rpc_summary", "summary"),
RPCAPICommand("rpc_get_miner_info", "get_miner_info"),
],
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_get_version", "get_version")],
[RPCAPICommand("rpc_get_version", "get_version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[
RPCAPICommand("api_get_version", "get_version"),
RPCAPICommand("api_summary", "summary"),
RPCAPICommand("rpc_get_version", "get_version"),
RPCAPICommand("rpc_summary", "summary"),
],
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname",
[RPCAPICommand("api_get_miner_info", "get_miner_info")],
[RPCAPICommand("rpc_get_miner_info", "get_miner_info")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("api_devs", "devs")],
[RPCAPICommand("rpc_devs", "devs")],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[
RPCAPICommand("api_summary", "summary"),
RPCAPICommand("api_get_psu", "get_psu"),
RPCAPICommand("rpc_summary", "summary"),
RPCAPICommand("rpc_get_psu", "get_psu"),
],
),
str(DataOptions.FAN_PSU): DataFunction(
"_get_fan_psu",
[
RPCAPICommand("api_summary", "summary"),
RPCAPICommand("api_get_psu", "get_psu"),
RPCAPICommand("rpc_summary", "summary"),
RPCAPICommand("rpc_get_psu", "get_psu"),
],
),
str(DataOptions.ERRORS): DataFunction(
"_get_errors",
[
RPCAPICommand("api_get_error_code", "get_error_code"),
RPCAPICommand("api_summary", "summary"),
RPCAPICommand("rpc_get_error_code", "get_error_code"),
RPCAPICommand("rpc_summary", "summary"),
],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
[RPCAPICommand("api_get_miner_info", "get_miner_info")],
[RPCAPICommand("rpc_get_miner_info", "get_miner_info")],
),
str(DataOptions.IS_MINING): DataFunction(
"_is_mining",
[RPCAPICommand("api_status", "status")],
[RPCAPICommand("rpc_status", "status")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
}
)
@@ -113,16 +113,16 @@ BTMINER_DATA_LOC = DataLocations(
class BTMiner(BaseMiner):
"""Base handler for BTMiner based miners."""
_api_cls = BTMinerRPCAPI
api: BTMinerRPCAPI
_rpc_cls = BTMinerRPCAPI
rpc: BTMinerRPCAPI
data_locations = BTMINER_DATA_LOC
supports_shutdown = True
async def _reset_api_pwd_to_admin(self, pwd: str):
async def _reset_rpc_pwd_to_admin(self, pwd: str):
try:
data = await self.api.update_pwd(pwd, "admin")
data = await self.rpc.update_pwd(pwd, "admin")
except APIError:
return False
if data:
@@ -133,7 +133,7 @@ class BTMiner(BaseMiner):
async def fault_light_off(self) -> bool:
try:
data = await self.api.set_led(auto=True)
data = await self.rpc.set_led(auto=True)
except APIError:
return False
if data:
@@ -145,8 +145,8 @@ class BTMiner(BaseMiner):
async def fault_light_on(self) -> bool:
try:
data = await self.api.set_led(auto=False)
await self.api.set_led(
data = await self.rpc.set_led(auto=False)
await self.rpc.set_led(
auto=False, color="green", start=0, period=1, duration=0
)
except APIError:
@@ -160,7 +160,7 @@ class BTMiner(BaseMiner):
async def reboot(self) -> bool:
try:
data = await self.api.reboot()
data = await self.rpc.reboot()
except APIError:
return False
if data.get("Msg"):
@@ -170,7 +170,7 @@ class BTMiner(BaseMiner):
async def restart_backend(self) -> bool:
try:
data = await self.api.restart()
data = await self.rpc.restart()
except APIError:
return False
if data.get("Msg"):
@@ -180,7 +180,7 @@ class BTMiner(BaseMiner):
async def stop_mining(self) -> bool:
try:
data = await self.api.power_off(respbefore=True)
data = await self.rpc.power_off(respbefore=True)
except APIError:
return False
if data.get("Msg"):
@@ -190,7 +190,7 @@ class BTMiner(BaseMiner):
async def resume_mining(self) -> bool:
try:
data = await self.api.power_on()
data = await self.rpc.power_on()
except APIError:
return False
if data.get("Msg"):
@@ -205,16 +205,16 @@ class BTMiner(BaseMiner):
pools_conf = conf["pools"]
try:
await self.api.update_pools(**pools_conf)
await self.rpc.update_pools(**pools_conf)
if conf["mode"] == "normal":
await self.api.set_normal_power()
await self.rpc.set_normal_power()
elif conf["mode"] == "high":
await self.api.set_high_power()
await self.rpc.set_high_power()
elif conf["mode"] == "low":
await self.api.set_low_power()
await self.rpc.set_low_power()
elif conf["mode"] == "power_tuning":
await self.api.adjust_power_limit(conf["power_tuning"]["wattage"])
await self.rpc.adjust_power_limit(conf["power_tuning"]["wattage"])
except APIError:
# cannot update, no API access usually
pass
@@ -224,7 +224,7 @@ class BTMiner(BaseMiner):
summary = None
status = None
try:
data = await self.api.multicommand("pools", "summary", "status")
data = await self.rpc.multicommand("pools", "summary", "status")
pools = data["pools"][0]
summary = data["summary"][0]
status = data["status"][0]
@@ -234,7 +234,7 @@ class BTMiner(BaseMiner):
pass
if pools is not None:
cfg = MinerConfig.from_api(pools)
cfg = MinerConfig.from_rpc(pools)
else:
cfg = MinerConfig()
@@ -271,7 +271,7 @@ class BTMiner(BaseMiner):
async def set_power_limit(self, wattage: int) -> bool:
try:
await self.api.adjust_power_limit(wattage)
await self.rpc.adjust_power_limit(wattage)
except Exception as e:
logging.warning(f"{self} set_power_limit: {e}")
return False
@@ -283,85 +283,85 @@ class BTMiner(BaseMiner):
##################################################
async def _get_mac(
self, api_summary: dict = None, api_get_miner_info: dict = None
self, rpc_summary: dict = None, rpc_get_miner_info: dict = None
) -> Optional[str]:
if api_get_miner_info is None:
if rpc_get_miner_info is None:
try:
api_get_miner_info = await self.api.get_miner_info()
rpc_get_miner_info = await self.rpc.get_miner_info()
except APIError:
pass
if api_get_miner_info is not None:
if rpc_get_miner_info is not None:
try:
mac = api_get_miner_info["Msg"]["mac"]
mac = rpc_get_miner_info["Msg"]["mac"]
return str(mac).upper()
except KeyError:
pass
if api_summary is None:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
mac = api_summary["SUMMARY"][0]["MAC"]
mac = rpc_summary["SUMMARY"][0]["MAC"]
return str(mac).upper()
except LookupError:
pass
async def _get_api_ver(self, api_get_version: dict = None) -> Optional[str]:
if api_get_version is None:
async def _get_api_ver(self, rpc_get_version: dict = None) -> Optional[str]:
if rpc_get_version is None:
try:
api_get_version = await self.api.get_version()
rpc_get_version = await self.rpc.get_version()
except APIError:
pass
if api_get_version is not None:
if "Code" in api_get_version.keys():
if api_get_version["Code"] == 131:
if rpc_get_version is not None:
if "Code" in rpc_get_version.keys():
if rpc_get_version["Code"] == 131:
try:
api_ver = api_get_version["Msg"]
if not isinstance(api_ver, str):
api_ver = api_ver["api_ver"]
self.api_ver = api_ver.replace("whatsminer v", "")
rpc_ver = rpc_get_version["Msg"]
if not isinstance(rpc_ver, str):
rpc_ver = rpc_ver["rpc_ver"]
self.rpc_ver = rpc_ver.replace("whatsminer v", "")
except (KeyError, TypeError):
pass
else:
self.api.api_ver = self.api_ver
return self.api_ver
self.rpc.rpc_ver = self.rpc_ver
return self.rpc_ver
return self.api_ver
return self.rpc_ver
async def _get_fw_ver(
self, api_get_version: dict = None, api_summary: dict = None
self, rpc_get_version: dict = None, rpc_summary: dict = None
) -> Optional[str]:
if api_get_version is None:
if rpc_get_version is None:
try:
api_get_version = await self.api.get_version()
rpc_get_version = await self.rpc.get_version()
except APIError:
pass
if api_get_version is not None:
if "Code" in api_get_version.keys():
if api_get_version["Code"] == 131:
if rpc_get_version is not None:
if "Code" in rpc_get_version.keys():
if rpc_get_version["Code"] == 131:
try:
self.fw_ver = api_get_version["Msg"]["fw_ver"]
self.fw_ver = rpc_get_version["Msg"]["fw_ver"]
except (KeyError, TypeError):
pass
else:
return self.fw_ver
if api_summary is None:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary:
if rpc_summary:
try:
self.fw_ver = api_summary["SUMMARY"][0]["Firmware Version"].replace(
self.fw_ver = rpc_summary["SUMMARY"][0]["Firmware Version"].replace(
"'", ""
)
except LookupError:
@@ -369,50 +369,50 @@ class BTMiner(BaseMiner):
return self.fw_ver
async def _get_hostname(self, api_get_miner_info: dict = None) -> Optional[str]:
async def _get_hostname(self, rpc_get_miner_info: dict = None) -> Optional[str]:
hostname = None
if api_get_miner_info is None:
if rpc_get_miner_info is None:
try:
api_get_miner_info = await self.api.get_miner_info()
rpc_get_miner_info = await self.rpc.get_miner_info()
except APIError:
return None # only one way to get this
if api_get_miner_info is not None:
if rpc_get_miner_info is not None:
try:
hostname = api_get_miner_info["Msg"]["hostname"]
hostname = rpc_get_miner_info["Msg"]["hostname"]
except KeyError:
return None
return hostname
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]:
if api_summary is None:
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
return round(float(rpc_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except LookupError:
pass
async def _get_hashboards(self, api_devs: dict = None) -> List[HashBoard]:
async def _get_hashboards(self, rpc_devs: dict = None) -> List[HashBoard]:
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if api_devs is None:
if rpc_devs is None:
try:
api_devs = await self.api.devs()
rpc_devs = await self.rpc.devs()
except APIError:
pass
if api_devs is not None:
if rpc_devs is not None:
try:
for board in api_devs["DEVS"]:
for board in rpc_devs["DEVS"]:
if len(hashboards) < board["ASC"] + 1:
hashboards.append(
HashBoard(
@@ -433,62 +433,62 @@ class BTMiner(BaseMiner):
return hashboards
async def _get_env_temp(self, api_summary: dict = None) -> Optional[float]:
if api_summary is None:
async def _get_env_temp(self, rpc_summary: dict = None) -> Optional[float]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return api_summary["SUMMARY"][0]["Env Temp"]
return rpc_summary["SUMMARY"][0]["Env Temp"]
except LookupError:
pass
async def _get_wattage(self, api_summary: dict = None) -> Optional[int]:
if api_summary is None:
async def _get_wattage(self, rpc_summary: dict = None) -> Optional[int]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
wattage = api_summary["SUMMARY"][0]["Power"]
wattage = rpc_summary["SUMMARY"][0]["Power"]
return wattage if not wattage == -1 else None
except LookupError:
pass
async def _get_wattage_limit(self, api_summary: dict = None) -> Optional[int]:
if api_summary is None:
async def _get_wattage_limit(self, rpc_summary: dict = None) -> Optional[int]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return api_summary["SUMMARY"][0]["Power Limit"]
return rpc_summary["SUMMARY"][0]["Power Limit"]
except LookupError:
pass
async def _get_fans(
self, api_summary: dict = None, api_get_psu: dict = None
self, rpc_summary: dict = None, rpc_get_psu: dict = None
) -> List[Fan]:
if api_summary is None:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
fans = [Fan() for _ in range(self.expected_fans)]
if api_summary is not None:
if rpc_summary is not None:
try:
if self.expected_fans > 0:
fans = [
Fan(api_summary["SUMMARY"][0].get("Fan Speed In", 0)),
Fan(api_summary["SUMMARY"][0].get("Fan Speed Out", 0)),
Fan(rpc_summary["SUMMARY"][0].get("Fan Speed In", 0)),
Fan(rpc_summary["SUMMARY"][0].get("Fan Speed Out", 0)),
]
except LookupError:
pass
@@ -496,45 +496,45 @@ class BTMiner(BaseMiner):
return fans
async def _get_fan_psu(
self, api_summary: dict = None, api_get_psu: dict = None
self, rpc_summary: dict = None, rpc_get_psu: dict = None
) -> Optional[int]:
if api_summary is None:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return int(api_summary["SUMMARY"][0]["Power Fanspeed"])
return int(rpc_summary["SUMMARY"][0]["Power Fanspeed"])
except LookupError:
pass
if api_get_psu is None:
if rpc_get_psu is None:
try:
api_get_psu = await self.api.get_psu()
rpc_get_psu = await self.rpc.get_psu()
except APIError:
pass
if api_get_psu is not None:
if rpc_get_psu is not None:
try:
return int(api_get_psu["Msg"]["fan_speed"])
return int(rpc_get_psu["Msg"]["fan_speed"])
except (KeyError, TypeError):
pass
async def _get_errors(
self, api_summary: dict = None, api_get_error_code: dict = None
self, rpc_summary: dict = None, rpc_get_error_code: dict = None
) -> List[MinerErrorData]:
errors = []
if api_get_error_code is None and api_summary is None:
if rpc_get_error_code is None and rpc_summary is None:
try:
api_get_error_code = await self.api.get_error_code()
rpc_get_error_code = await self.rpc.get_error_code()
except APIError:
pass
if api_get_error_code is not None:
if rpc_get_error_code is not None:
try:
for err in api_get_error_code["Msg"]["error_code"]:
for err in rpc_get_error_code["Msg"]["error_code"]:
if isinstance(err, dict):
for code in err:
errors.append(WhatsminerError(error_code=int(code)))
@@ -543,48 +543,48 @@ class BTMiner(BaseMiner):
except KeyError:
pass
if api_summary is None:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
for i in range(api_summary["SUMMARY"][0]["Error Code Count"]):
err = api_summary["SUMMARY"][0].get(f"Error Code {i}")
for i in range(rpc_summary["SUMMARY"][0]["Error Code Count"]):
err = rpc_summary["SUMMARY"][0].get(f"Error Code {i}")
if err:
errors.append(WhatsminerError(error_code=err))
except (LookupError, ValueError, TypeError):
pass
return errors
async def _get_expected_hashrate(self, api_summary: dict = None) -> Optional[float]:
if api_summary is None:
async def _get_expected_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
expected_hashrate = api_summary["SUMMARY"][0]["Factory GHS"]
expected_hashrate = rpc_summary["SUMMARY"][0]["Factory GHS"]
if expected_hashrate:
return round(expected_hashrate / 1000, 2)
except LookupError:
pass
async def _get_fault_light(self, api_get_miner_info: dict = None) -> Optional[bool]:
if api_get_miner_info is None:
async def _get_fault_light(self, rpc_get_miner_info: dict = None) -> Optional[bool]:
if rpc_get_miner_info is None:
try:
api_get_miner_info = await self.api.get_miner_info()
rpc_get_miner_info = await self.rpc.get_miner_info()
except APIError:
if not self.light:
self.light = False
if api_get_miner_info is not None:
if rpc_get_miner_info is not None:
try:
self.light = not (api_get_miner_info["Msg"]["ledstat"] == "auto")
self.light = not (rpc_get_miner_info["Msg"]["ledstat"] == "auto")
except KeyError:
pass
@@ -600,46 +600,46 @@ class BTMiner(BaseMiner):
):
if not hostname:
hostname = await self.get_hostname()
await self.api.net_config(
await self.rpc.net_config(
ip=ip, mask=subnet_mask, dns=dns, gate=gateway, host=hostname, dhcp=False
)
async def set_dhcp(self, hostname: str = None):
if hostname:
await self.set_hostname(hostname)
await self.api.net_config()
await self.rpc.net_config()
async def set_hostname(self, hostname: str):
await self.api.set_hostname(hostname)
await self.rpc.set_hostname(hostname)
async def _is_mining(self, api_status: dict = None) -> Optional[bool]:
if api_status is None:
async def _is_mining(self, rpc_status: dict = None) -> Optional[bool]:
if rpc_status is None:
try:
api_status = await self.api.status()
rpc_status = await self.rpc.status()
except APIError:
pass
if api_status is not None:
if rpc_status is not None:
try:
if api_status["Msg"].get("btmineroff"):
if rpc_status["Msg"].get("btmineroff"):
try:
await self.api.devdetails()
await self.rpc.devdetails()
except APIError:
return False
return True
return True if api_status["Msg"]["mineroff"] == "false" else False
return True if rpc_status["Msg"]["mineroff"] == "false" else False
except LookupError:
pass
async def _get_uptime(self, api_summary: dict = None) -> Optional[int]:
if api_summary is None:
async def _get_uptime(self, rpc_summary: dict = None) -> Optional[int]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return int(api_summary["SUMMARY"][0]["Elapsed"])
return int(rpc_summary["SUMMARY"][0]["Elapsed"])
except LookupError:
pass

View File

@@ -26,31 +26,31 @@ CGMINER_DATA_LOC = DataLocations(
**{
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
}
)
@@ -59,79 +59,79 @@ CGMINER_DATA_LOC = DataLocations(
class CGMiner(BaseMiner):
"""Base handler for CGMiner based miners"""
_api_cls = CGMinerRPCAPI
api: CGMinerRPCAPI
_rpc_cls = CGMinerRPCAPI
rpc: CGMinerRPCAPI
data_locations = CGMINER_DATA_LOC
async def get_config(self) -> MinerConfig:
# get pool data
try:
pools = await self.api.pools()
pools = await self.rpc.pools()
except APIError:
return self.config
self.config = MinerConfig.from_api(pools)
self.config = MinerConfig.from_rpc(pools)
return self.config
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_api_ver(self, api_version: dict = None) -> Optional[str]:
if api_version is None:
async def _get_api_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
api_version = await self.api.version()
rpc_version = await self.rpc.version()
except APIError:
pass
if api_version is not None:
if rpc_version is not None:
try:
self.api_ver = api_version["VERSION"][0]["API"]
self.rpc_ver = rpc_version["VERSION"][0]["API"]
except LookupError:
pass
return self.api_ver
return self.rpc_ver
async def _get_fw_ver(self, api_version: dict = None) -> Optional[str]:
if api_version is None:
async def _get_fw_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
api_version = await self.api.version()
rpc_version = await self.rpc.version()
except APIError:
pass
if api_version is not None:
if rpc_version is not None:
try:
self.fw_ver = api_version["VERSION"][0]["CGMiner"]
self.fw_ver = rpc_version["VERSION"][0]["CGMiner"]
except LookupError:
pass
return self.fw_ver
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]:
if api_summary is None:
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return round(
float(float(api_summary["SUMMARY"][0]["GHS 5s"]) / 1000), 2
float(float(rpc_summary["SUMMARY"][0]["GHS 5s"]) / 1000), 2
)
except (LookupError, ValueError, TypeError):
pass
async def _get_uptime(self, api_stats: dict = None) -> Optional[int]:
if api_stats is None:
async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
return int(api_stats["STATS"][1]["Elapsed"])
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass

View File

@@ -37,7 +37,7 @@ GOLDSHELL_DATA_LOC = DataLocations(
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
@@ -45,22 +45,22 @@ GOLDSHELL_DATA_LOC = DataLocations(
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[
RPCAPICommand("api_devs", "devs"),
RPCAPICommand("api_devdetails", "devdetails"),
RPCAPICommand("rpc_devs", "devs"),
RPCAPICommand("rpc_devdetails", "devdetails"),
],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
}
)
@@ -130,11 +130,11 @@ class GoldshellMiner(BFGMiner):
pass
async def _get_hashboards(
self, api_devs: dict = None, api_devdetails: dict = None
self, rpc_devs: dict = None, rpc_devdetails: dict = None
) -> List[HashBoard]:
if api_devs is None:
if rpc_devs is None:
try:
api_devs = await self.api.devs()
rpc_devs = await self.rpc.devs()
except APIError:
pass
@@ -143,9 +143,9 @@ class GoldshellMiner(BFGMiner):
for i in range(self.expected_hashboards)
]
if api_devs is not None:
if api_devs.get("DEVS"):
for board in api_devs["DEVS"]:
if rpc_devs is not None:
if rpc_devs.get("DEVS"):
for board in rpc_devs["DEVS"]:
if board.get("ID") is not None:
try:
b_id = board["ID"]
@@ -157,17 +157,17 @@ class GoldshellMiner(BFGMiner):
except KeyError:
pass
else:
logger.error(self, api_devs)
logger.error(self, rpc_devs)
if api_devdetails is None:
if rpc_devdetails is None:
try:
api_devdetails = await self.api.devdetails()
rpc_devdetails = await self.rpc.devdetails()
except APIError:
pass
if api_devdetails is not None:
if api_devdetails.get("DEVS"):
for board in api_devdetails["DEVS"]:
if rpc_devdetails is not None:
if rpc_devdetails.get("DEVS"):
for board in rpc_devdetails["DEVS"]:
if board.get("ID") is not None:
try:
b_id = board["ID"]
@@ -175,6 +175,6 @@ class GoldshellMiner(BFGMiner):
except KeyError:
pass
else:
logger.error(self, api_devdetails)
logger.error(self, rpc_devdetails)
return hashboards

View File

@@ -41,23 +41,23 @@ INNOSILICON_DATA_LOC = DataLocations(
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[
RPCAPICommand("api_summary", "summary"),
RPCAPICommand("rpc_summary", "summary"),
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[
RPCAPICommand("api_stats", "stats"),
RPCAPICommand("rpc_stats", "stats"),
WebAPICommand("web_get_all", "getAll"),
],
),
@@ -65,7 +65,7 @@ INNOSILICON_DATA_LOC = DataLocations(
"_get_wattage",
[
WebAPICommand("web_get_all", "getAll"),
RPCAPICommand("api_stats", "stats"),
RPCAPICommand("rpc_stats", "stats"),
],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
@@ -88,7 +88,7 @@ INNOSILICON_DATA_LOC = DataLocations(
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
}
)
@@ -168,14 +168,14 @@ class Innosilicon(CGMiner):
pass
async def _get_hashrate(
self, api_summary: dict = None, web_get_all: dict = None
self, rpc_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
if web_get_all:
web_get_all = web_get_all["all"]
if api_summary is None and web_get_all is None:
if rpc_summary is None and web_get_all is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
@@ -193,14 +193,14 @@ class Innosilicon(CGMiner):
except KeyError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
return round(float(rpc_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except (KeyError, IndexError):
pass
async def _get_hashboards(
self, api_stats: dict = None, web_get_all: dict = None
self, rpc_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
if web_get_all:
web_get_all = web_get_all["all"]
@@ -210,9 +210,9 @@ class Innosilicon(CGMiner):
for i in range(self.expected_hashboards)
]
if api_stats is None:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
@@ -224,9 +224,9 @@ class Innosilicon(CGMiner):
else:
web_get_all = web_get_all["all"]
if api_stats is not None:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
if rpc_stats is not None:
if rpc_stats.get("STATS"):
for board in rpc_stats["STATS"]:
try:
idx = board["Chain ID"]
chips = board["Num active chips"]
@@ -258,7 +258,7 @@ class Innosilicon(CGMiner):
return hashboards
async def _get_wattage(
self, web_get_all: dict = None, api_stats: dict = None
self, web_get_all: dict = None, rpc_stats: dict = None
) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
@@ -277,15 +277,15 @@ class Innosilicon(CGMiner):
except KeyError:
pass
if api_stats is None:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
if rpc_stats is not None:
if rpc_stats.get("STATS"):
for board in rpc_stats["STATS"]:
try:
wattage = board["power"]
except KeyError:

View File

@@ -26,30 +26,30 @@ LUXMINER_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac",
[RPCAPICommand("api_config", "config")],
[RPCAPICommand("rpc_config", "config")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[RPCAPICommand("api_power", "power")],
[RPCAPICommand("rpc_power", "power")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_fans", "fans")],
[RPCAPICommand("rpc_fans", "fans")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime", [RPCAPICommand("api_stats", "stats")]
"_get_uptime", [RPCAPICommand("rpc_stats", "stats")]
),
}
)
@@ -58,8 +58,8 @@ LUXMINER_DATA_LOC = DataLocations(
class LUXMiner(BaseMiner):
"""Handler for LuxOS miners"""
_api_cls = LUXMinerRPCAPI
api: LUXMinerRPCAPI
_rpc_cls = LUXMinerRPCAPI
rpc: LUXMinerRPCAPI
firmware = "LuxOS"
@@ -67,14 +67,14 @@ class LUXMiner(BaseMiner):
async def _get_session(self) -> Optional[str]:
try:
data = await self.api.session()
data = await self.rpc.session()
if not data["SESSION"][0]["SessionID"] == "":
return data["SESSION"][0]["SessionID"]
except APIError:
pass
try:
data = await self.api.logon()
data = await self.rpc.logon()
return data["SESSION"][0]["SessionID"]
except (LookupError, APIError):
return
@@ -83,7 +83,7 @@ class LUXMiner(BaseMiner):
try:
session_id = await self._get_session()
if session_id:
await self.api.ledset(session_id, "red", "blink")
await self.rpc.ledset(session_id, "red", "blink")
return True
except (APIError, LookupError):
pass
@@ -93,7 +93,7 @@ class LUXMiner(BaseMiner):
try:
session_id = await self._get_session()
if session_id:
await self.api.ledset(session_id, "red", "off")
await self.rpc.ledset(session_id, "red", "off")
return True
except (APIError, LookupError):
pass
@@ -106,7 +106,7 @@ class LUXMiner(BaseMiner):
try:
session_id = await self._get_session()
if session_id:
await self.api.resetminer(session_id)
await self.rpc.resetminer(session_id)
return True
except (APIError, LookupError):
pass
@@ -116,7 +116,7 @@ class LUXMiner(BaseMiner):
try:
session_id = await self._get_session()
if session_id:
await self.api.curtail(session_id)
await self.rpc.curtail(session_id)
return True
except (APIError, LookupError):
pass
@@ -126,7 +126,7 @@ class LUXMiner(BaseMiner):
try:
session_id = await self._get_session()
if session_id:
await self.api.wakeup(session_id)
await self.rpc.wakeup(session_id)
return True
except (APIError, LookupError):
pass
@@ -135,7 +135,7 @@ class LUXMiner(BaseMiner):
try:
session_id = await self._get_session()
if session_id:
await self.api.rebootdevice(session_id)
await self.rpc.rebootdevice(session_id)
return True
except (APIError, LookupError):
pass
@@ -148,48 +148,48 @@ class LUXMiner(BaseMiner):
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_mac(self, api_config: dict = None) -> Optional[str]:
async def _get_mac(self, rpc_config: dict = None) -> Optional[str]:
mac = None
if api_config is None:
if rpc_config is None:
try:
api_config = await self.api.config()
rpc_config = await self.rpc.config()
except APIError:
return None
if api_config is not None:
if rpc_config is not None:
try:
mac = api_config["CONFIG"][0]["MACAddr"]
mac = rpc_config["CONFIG"][0]["MACAddr"]
except KeyError:
return None
return mac
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]:
if api_summary is None:
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return round(float(api_summary["SUMMARY"][0]["GHS 5s"] / 1000), 2)
return round(float(rpc_summary["SUMMARY"][0]["GHS 5s"] / 1000), 2)
except (LookupError, ValueError, TypeError):
pass
async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
hashboards = []
if api_stats is None:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
board_offset = -1
boards = api_stats["STATS"]
boards = rpc_stats["STATS"]
if len(boards) > 1:
for board_num in range(1, 16, 5):
for _b_num in range(5):
@@ -231,48 +231,48 @@ class LUXMiner(BaseMiner):
return hashboards
async def _get_wattage(self, api_power: dict = None) -> Optional[int]:
if api_power is None:
async def _get_wattage(self, rpc_power: dict = None) -> Optional[int]:
if rpc_power is None:
try:
api_power = await self.api.power()
rpc_power = await self.rpc.power()
except APIError:
pass
if api_power is not None:
if rpc_power is not None:
try:
return api_power["POWER"][0]["Watts"]
return rpc_power["POWER"][0]["Watts"]
except (LookupError, ValueError, TypeError):
pass
async def _get_fans(self, api_fans: dict = None) -> List[Fan]:
if api_fans is None:
async def _get_fans(self, rpc_fans: dict = None) -> List[Fan]:
if rpc_fans is None:
try:
api_fans = await self.api.fans()
rpc_fans = await self.rpc.fans()
except APIError:
pass
fans = []
if api_fans is not None:
if rpc_fans is not None:
for fan in range(self.expected_fans):
try:
fans.append(Fan(api_fans["FANS"][fan]["RPM"]))
fans.append(Fan(rpc_fans["FANS"][fan]["RPM"]))
except (LookupError, ValueError, TypeError):
fans.append(Fan())
return fans
async def _get_expected_hashrate(self, api_stats: dict = None) -> Optional[float]:
if api_stats is None:
async def _get_expected_hashrate(self, rpc_stats: dict = None) -> Optional[float]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
expected_rate = api_stats["STATS"][1]["total_rateideal"]
expected_rate = rpc_stats["STATS"][1]["total_rateideal"]
try:
rate_unit = api_stats["STATS"][1]["rate_unit"]
rate_unit = rpc_stats["STATS"][1]["rate_unit"]
except KeyError:
rate_unit = "GH"
if rate_unit == "GH":
@@ -284,15 +284,15 @@ class LUXMiner(BaseMiner):
except LookupError:
pass
async def _get_uptime(self, api_stats: dict = None) -> Optional[int]:
if api_stats is None:
async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
try:
api_stats = await self.api.stats()
rpc_stats = await self.rpc.stats()
except APIError:
pass
if api_stats is not None:
if rpc_stats is not None:
try:
return int(api_stats["STATS"][1]["Elapsed"])
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass

View File

@@ -30,7 +30,7 @@ class UnknownMiner(BaseMiner):
) -> None:
super().__init__(ip)
self.ip = ip
self.api = UnknownRPCAPI(ip)
self.rpc = UnknownRPCAPI(ip)
def __repr__(self) -> str:
return f"Unknown: {str(self.ip)}"

View File

@@ -36,7 +36,7 @@ VNISH_DATA_LOC = DataLocations(
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("api_version", "version")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
@@ -48,15 +48,15 @@ VNISH_DATA_LOC = DataLocations(
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("api_summary", "summary")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
@@ -68,11 +68,11 @@ VNISH_DATA_LOC = DataLocations(
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("api_stats", "stats")],
[RPCAPICommand("rpc_stats", "stats")],
),
}
)
@@ -172,18 +172,18 @@ class VNish(BMMiner):
except KeyError:
pass
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]:
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
# get hr from API
if api_summary is None:
if rpc_summary is None:
try:
api_summary = await self.api.summary()
rpc_summary = await self.rpc.summary()
except APIError:
pass
if api_summary is not None:
if rpc_summary is not None:
try:
return round(
float(float(api_summary["SUMMARY"][0]["GHS 5s"]) / 1000), 2
float(float(rpc_summary["SUMMARY"][0]["GHS 5s"]) / 1000), 2
)
except (LookupError, ValueError, TypeError):
pass

View File

@@ -24,15 +24,16 @@ from pyasic.data.error_codes import MinerErrorData
from pyasic.errors import APIError
from pyasic.logger import logger
from pyasic.miners.data import DataLocations, DataOptions, RPCAPICommand, WebAPICommand
from pyasic.rpc.base import BaseMinerRPCAPI
class MinerProtocol(Protocol):
_api_cls: Type = None
_rpc_cls: Type = None
_web_cls: Type = None
_ssh_cls: Type = None
ip: str = None
api: _api_cls = None
rpc: _rpc_cls = None
web: _web_cls = None
ssh: _ssh_cls = None
@@ -73,6 +74,10 @@ class MinerProtocol(Protocol):
model_data.append(f"({self.firmware})")
return " ".join(model_data)
@property
def api(self):
return self.rpc
async def check_light(self) -> bool:
return await self.get_fault_light()
@@ -488,8 +493,8 @@ class BaseMiner(MinerProtocol):
)
# interfaces
if self._api_cls is not None:
self.api = self._api_cls(ip)
if self._rpc_cls is not None:
self.rpc = self._rpc_cls(ip)
if self._web_cls is not None:
self.web = self._web_cls(ip)
if self._ssh_cls is not None: