bug: add test to cross check function arguments, and fix some method implementations and naming.

This commit is contained in:
UpstreamData
2024-01-15 10:16:47 -07:00
parent 06cc84f16d
commit aa9ba66f8e
14 changed files with 655 additions and 713 deletions

View File

@@ -2,9 +2,8 @@
## A10X Models
## A10X
::: pyasic.miners.innosilicon.cgminer.A10X.A10X.CGMinerA10X
::: pyasic.miners.innosilicon.cgminer.A10X.A10X.InnosiliconA10X
handler: python
options:
show_root_heading: false
heading_level: 4

View File

@@ -2,9 +2,8 @@
## T3X Models
## T3H+
::: pyasic.miners.innosilicon.cgminer.T3X.T3H.CGMinerT3HPlus
::: pyasic.miners.innosilicon.cgminer.T3X.T3H.InnosiliconT3HPlus
handler: python
options:
show_root_heading: false
heading_level: 4

View File

@@ -25,10 +25,6 @@ from pyasic.miners.types import T9
class HiveonT9(Hiveon, T9):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.pwd = "admin"
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
@@ -46,34 +42,44 @@ class HiveonT9(Hiveon, T9):
pass
async def get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
hashboards = [
HashBoard(slot=board, expected_chips=self.expected_chips)
for board in range(self.expected_hashboards)
]
if api_stats is None:
try:
api_stats = self.api.stats()
except APIError:
return []
board_map = {
0: [2, 9, 10],
1: [3, 11, 12],
2: [4, 13, 14],
}
hashboards = []
for board in board_map:
hashboard = HashBoard(slot=board, expected_chips=self.expected_chips)
hashrate = 0
chips = 0
for chipset in board_map[board]:
if hashboard.chip_temp == None:
if hashboards[board].chip_temp is None:
try:
hashboard.board_temp = api_stats["STATS"][1][f"temp{chipset}"]
hashboard.chip_temp = api_stats["STATS"][1][f"temp2_{chipset}"]
except LookupError:
hashboards[board].temp = api_stats["STATS"][1][f"temp{chipset}"]
hashboards[board].chip_temp = api_stats["STATS"][1][
f"temp2_{chipset}"
]
except (KeyError, IndexError):
pass
else:
hashboard.missing = False
hashboards[board].missing = False
try:
hashrate += api_stats["STATS"][1][f"chain_rate{chipset}"]
chips += api_stats["STATS"][1][f"chain_acn{chipset}"]
except LookupError:
except (KeyError, IndexError):
pass
hashboard.hashrate = round(hashrate / 1000, 2)
hashboard.chips = chips
hashboards.append(hashboard)
hashboards[board].hashrate = round(hashrate / 1000, 2)
hashboards[board].chips = chips
return hashboards
@@ -88,7 +94,7 @@ class HiveonT9(Hiveon, T9):
boards = api_stats.get("STATS")
try:
wattage_raw = boards[1]["chain_power"]
except LookupError:
except (KeyError, IndexError):
pass
else:
# parse wattage position out of raw data
@@ -113,7 +119,7 @@ class HiveonT9(Hiveon, T9):
env_temp = api_stats["STATS"][1][f"temp3_{chipset}"]
if not env_temp == 0:
env_temp_list.append(int(env_temp))
except LookupError:
except (KeyError, IndexError):
pass
if not env_temp_list == []:

View File

@@ -15,7 +15,8 @@
# ------------------------------------------------------------------------------
import logging
from typing import List, Optional
from collections import namedtuple
from typing import List, Optional, Tuple
from pyasic.API.btminer import BTMinerAPI
from pyasic.config import MinerConfig, MiningModeConfig
@@ -28,74 +29,80 @@ from pyasic.miners.base import (
DataLocations,
DataOptions,
RPCAPICommand,
WebAPICommand,
)
BTMINER_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac",
"get_mac",
[
RPCAPICommand("api_summary", "summary"),
RPCAPICommand("api_get_miner_info", "get_miner_info"),
],
),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver", [RPCAPICommand("api_get_version", "get_version")]
"get_api_ver", [RPCAPICommand("api_get_version", "get_version")]
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
"get_fw_ver",
[
RPCAPICommand("api_get_version", "get_version"),
RPCAPICommand("api_summary", "summary"),
],
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname", [RPCAPICommand("api_get_miner_info", "get_miner_info")]
"get_hostname", [RPCAPICommand("api_get_miner_info", "get_miner_info")]
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate", [RPCAPICommand("api_summary", "summary")]
"get_hashrate", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate", [RPCAPICommand("api_summary", "summary")]
"get_expected_hashrate", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards", [RPCAPICommand("api_devs", "devs")]
"get_hashboards", [RPCAPICommand("api_devs", "devs")]
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp", [RPCAPICommand("api_summary", "summary")]
"get_env_temp", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage", [RPCAPICommand("api_summary", "summary")]
"get_wattage", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit", [RPCAPICommand("api_summary", "summary")]
"get_wattage_limit", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
"get_fans",
[
RPCAPICommand("api_summary", "summary"),
RPCAPICommand("api_get_psu", "get_psu"),
],
),
str(DataOptions.FAN_PSU): DataFunction(
"_get_fan_psu",
"get_fan_psu",
[
RPCAPICommand("api_summary", "summary"),
RPCAPICommand("api_get_psu", "get_psu"),
],
),
str(DataOptions.ERRORS): DataFunction(
"_get_errors", [RPCAPICommand("api_get_error_code", "get_error_code")]
"get_errors",
[
RPCAPICommand("api_get_error_code", "get_error_code"),
RPCAPICommand("api_summary", "summary"),
],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
"get_fault_light",
[RPCAPICommand("api_get_miner_info", "get_miner_info")],
),
str(DataOptions.IS_MINING): DataFunction(
"_is_mining", [RPCAPICommand("api_status", "status")]
"is_mining", [RPCAPICommand("api_status", "status")]
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime", [RPCAPICommand("api_summary", "summary")]
"get_uptime", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
@@ -236,7 +243,7 @@ class BTMiner(BaseMiner):
else:
cfg = MinerConfig()
is_mining = await self._is_mining(status)
is_mining = await self.is_mining(status)
if not is_mining:
cfg.mining_mode = MiningModeConfig.sleep()
return cfg
@@ -280,7 +287,7 @@ class BTMiner(BaseMiner):
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_mac(
async def get_mac(
self, api_summary: dict = None, api_get_miner_info: dict = None
) -> Optional[str]:
if not api_get_miner_info:
@@ -306,10 +313,24 @@ class BTMiner(BaseMiner):
try:
mac = api_summary["SUMMARY"][0]["MAC"]
return str(mac).upper()
except LookupError:
except (KeyError, IndexError):
pass
async def _get_api_ver(self, api_get_version: dict = None) -> Optional[str]:
async def get_version(
self, api_get_version: dict = None, api_summary: dict = None
) -> Tuple[Optional[str], Optional[str]]:
miner_version = namedtuple("MinerVersion", "api_ver fw_ver")
api_ver = await self.get_api_ver(api_get_version=api_get_version)
fw_ver = await self.get_fw_ver(
api_get_version=api_get_version, api_summary=api_summary
)
return miner_version(api_ver, fw_ver)
async def get_api_ver(self, api_get_version: dict = None) -> Optional[str]:
# Check to see if the version info is already cached
if self.api_ver:
return self.api_ver
if not api_get_version:
try:
api_get_version = await self.api.get_version()
@@ -332,9 +353,13 @@ class BTMiner(BaseMiner):
return self.api_ver
async def _get_fw_ver(
async def get_fw_ver(
self, api_get_version: dict = None, api_summary: dict = None
) -> Optional[str]:
# Check to see if the version info is already cached
if self.fw_ver:
return self.fw_ver
if not api_get_version:
try:
api_get_version = await self.api.get_version()
@@ -362,12 +387,12 @@ class BTMiner(BaseMiner):
self.fw_ver = api_summary["SUMMARY"][0]["Firmware Version"].replace(
"'", ""
)
except LookupError:
except (KeyError, IndexError):
pass
return self.fw_ver
async def _get_hostname(self, api_get_miner_info: dict = None) -> Optional[str]:
async def get_hostname(self, api_get_miner_info: dict = None) -> Optional[str]:
hostname = None
if not api_get_miner_info:
try:
@@ -383,7 +408,7 @@ class BTMiner(BaseMiner):
return hostname
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]:
async def get_hashrate(self, api_summary: dict = None) -> Optional[float]:
# get hr from API
if not api_summary:
try:
@@ -394,10 +419,10 @@ class BTMiner(BaseMiner):
if api_summary:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except LookupError:
except (KeyError, IndexError):
pass
async def _get_hashboards(self, api_devs: dict = None) -> List[HashBoard]:
async def get_hashboards(self, api_devs: dict = None) -> List[HashBoard]:
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
@@ -427,12 +452,12 @@ class BTMiner(BaseMiner):
hashboards[board["ASC"]].chips = board["Effective Chips"]
hashboards[board["ASC"]].serial_number = board["PCB SN"]
hashboards[board["ASC"]].missing = False
except LookupError:
except (KeyError, IndexError):
pass
return hashboards
async def _get_env_temp(self, api_summary: dict = None) -> Optional[float]:
async def get_env_temp(self, api_summary: dict = None) -> Optional[float]:
if not api_summary:
try:
api_summary = await self.api.summary()
@@ -442,10 +467,10 @@ class BTMiner(BaseMiner):
if api_summary:
try:
return api_summary["SUMMARY"][0]["Env Temp"]
except LookupError:
except (KeyError, IndexError):
pass
async def _get_wattage(self, api_summary: dict = None) -> Optional[int]:
async def get_wattage(self, api_summary: dict = None) -> Optional[int]:
if not api_summary:
try:
api_summary = await self.api.summary()
@@ -456,10 +481,10 @@ class BTMiner(BaseMiner):
try:
wattage = api_summary["SUMMARY"][0]["Power"]
return wattage if not wattage == -1 else None
except LookupError:
except (KeyError, IndexError):
pass
async def _get_wattage_limit(self, api_summary: dict = None) -> Optional[int]:
async def get_wattage_limit(self, api_summary: dict = None) -> Optional[int]:
if not api_summary:
try:
api_summary = await self.api.summary()
@@ -469,10 +494,10 @@ class BTMiner(BaseMiner):
if api_summary:
try:
return api_summary["SUMMARY"][0]["Power Limit"]
except LookupError:
except (KeyError, IndexError):
pass
async def _get_fans(
async def get_fans(
self, api_summary: dict = None, api_get_psu: dict = None
) -> List[Fan]:
if not api_summary:
@@ -481,20 +506,20 @@ class BTMiner(BaseMiner):
except APIError:
pass
fans = [Fan() for _ in range(self.expected_fans)]
fans = [Fan() for _ in range(self.fan_count)]
if api_summary:
try:
if self.expected_fans > 0:
if self.fan_count > 0:
fans = [
Fan(api_summary["SUMMARY"][0].get("Fan Speed In", 0)),
Fan(api_summary["SUMMARY"][0].get("Fan Speed Out", 0)),
]
except LookupError:
except (KeyError, IndexError):
pass
return fans
async def _get_fan_psu(
async def get_fan_psu(
self, api_summary: dict = None, api_get_psu: dict = None
) -> Optional[int]:
if not api_summary:
@@ -506,7 +531,7 @@ class BTMiner(BaseMiner):
if api_summary:
try:
return int(api_summary["SUMMARY"][0]["Power Fanspeed"])
except LookupError:
except (KeyError, IndexError):
pass
if not api_get_psu:
@@ -521,26 +546,11 @@ class BTMiner(BaseMiner):
except (KeyError, TypeError):
pass
async def _get_errors(
async def get_errors(
self, api_summary: dict = None, api_get_error_code: dict = None
) -> List[MinerErrorData]:
errors = []
if not api_summary and not api_get_error_code:
try:
api_summary = await self.api.summary()
except APIError:
pass
if api_summary:
try:
for i in range(api_summary["SUMMARY"][0]["Error Code Count"]):
err = api_summary["SUMMARY"][0].get(f"Error Code {i}")
if err:
errors.append(WhatsminerError(error_code=err))
except (LookupError, ValueError, TypeError):
pass
if not api_get_error_code:
if not api_get_error_code and not api_summary:
try:
api_get_error_code = await self.api.get_error_code()
except APIError:
@@ -554,9 +564,24 @@ class BTMiner(BaseMiner):
else:
errors.append(WhatsminerError(error_code=int(err)))
if not api_summary:
try:
api_summary = await self.api.summary()
except APIError:
pass
if api_summary:
try:
for i in range(api_summary["SUMMARY"][0]["Error Code Count"]):
err = api_summary["SUMMARY"][0].get(f"Error Code {i}")
if err:
errors.append(WhatsminerError(error_code=err))
except (KeyError, IndexError, ValueError, TypeError):
pass
return errors
async def _get_expected_hashrate(self, api_summary: dict = None):
async def get_expected_hashrate(self, api_summary: dict = None):
if not api_summary:
try:
api_summary = await self.api.summary()
@@ -568,10 +593,10 @@ class BTMiner(BaseMiner):
expected_hashrate = api_summary["SUMMARY"][0]["Factory GHS"]
if expected_hashrate:
return round(expected_hashrate / 1000, 2)
except LookupError:
except (KeyError, IndexError):
pass
async def _get_fault_light(self, api_get_miner_info: dict = None) -> bool:
async def get_fault_light(self, api_get_miner_info: dict = None) -> bool:
if not api_get_miner_info:
try:
api_get_miner_info = await self.api.get_miner_info()
@@ -609,7 +634,7 @@ class BTMiner(BaseMiner):
async def set_hostname(self, hostname: str):
await self.api.set_hostname(hostname)
async def _is_mining(self, api_status: dict = None) -> Optional[bool]:
async def is_mining(self, api_status: dict = None) -> Optional[bool]:
if not api_status:
try:
api_status = await self.api.status()
@@ -628,7 +653,7 @@ class BTMiner(BaseMiner):
except LookupError:
pass
async def _get_uptime(self, api_summary: dict = None) -> Optional[int]:
async def get_uptime(self, api_summary: dict = None) -> Optional[int]:
if not api_summary:
try:
api_summary = await self.api.summary()

View File

@@ -14,6 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
import logging
import re
from typing import List, Optional
@@ -27,43 +28,42 @@ from pyasic.miners.base import DataFunction, DataLocations, DataOptions, RPCAPIC
AVALON_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac", [RPCAPICommand("api_version", "version")]
"get_mac", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver", [RPCAPICommand("api_version", "version")]
"get_api_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname", [RPCAPICommand("api_version", "version")]
"get_fw_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate", [RPCAPICommand("api_devs", "devs")]
"get_hashrate", [RPCAPICommand("api_devs", "devs")]
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate", [RPCAPICommand("api_stats", "stats")]
"get_expected_hashrate", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards", [RPCAPICommand("api_stats", "stats")]
"get_hashboards", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp", [RPCAPICommand("api_stats", "stats")]
"get_env_temp", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.WATTAGE): DataFunction("_get_wattage"),
str(DataOptions.WATTAGE): DataFunction("get_wattage"),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit", [RPCAPICommand("api_stats", "stats")]
"get_wattage_limit", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.FANS): DataFunction(
"_get_fans", [RPCAPICommand("api_stats", "stats")]
"get_fans", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.FAN_PSU): DataFunction("_get_fan_psu"),
str(DataOptions.ERRORS): DataFunction("_get_errors"),
str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction("get_errors"),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light", [RPCAPICommand("api_stats", "stats")]
"get_fault_light", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.IS_MINING): DataFunction("_is_mining"),
str(DataOptions.UPTIME): DataFunction("_get_uptime"),
str(DataOptions.IS_MINING): DataFunction("is_mining"),
str(DataOptions.UPTIME): DataFunction("get_uptime"),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
)
@@ -174,7 +174,7 @@ class CGMinerAvalon(CGMiner):
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_mac(self, api_version: dict = None) -> Optional[str]:
async def get_mac(self, api_version: dict = None) -> Optional[str]:
if not api_version:
try:
api_version = await self.api.version()
@@ -192,7 +192,7 @@ class CGMinerAvalon(CGMiner):
except (KeyError, ValueError):
pass
async def _get_hostname(self, mac: str = None) -> Optional[str]:
async def get_hostname(self) -> Optional[str]:
return None
# if not mac:
# mac = await self.get_mac()
@@ -200,7 +200,7 @@ class CGMinerAvalon(CGMiner):
# if mac:
# return f"Avalon{mac.replace(':', '')[-6:]}"
async def _get_hashrate(self, api_devs: dict = None) -> Optional[float]:
async def get_hashrate(self, api_devs: dict = None) -> Optional[float]:
if not api_devs:
try:
api_devs = await self.api.devs()
@@ -210,10 +210,10 @@ class CGMinerAvalon(CGMiner):
if api_devs:
try:
return round(float(api_devs["DEVS"][0]["MHS 1m"] / 1000000), 2)
except (LookupError, ValueError, TypeError):
except (KeyError, IndexError, ValueError, TypeError):
pass
async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
async def get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
@@ -229,7 +229,7 @@ class CGMinerAvalon(CGMiner):
try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
except (LookupError, ValueError, TypeError):
except (IndexError, KeyError, ValueError, TypeError):
return hashboards
for board in range(self.expected_hashboards):
@@ -261,7 +261,7 @@ class CGMinerAvalon(CGMiner):
return hashboards
async def _get_expected_hashrate(self, api_stats: dict = None) -> Optional[float]:
async def get_expected_hashrate(self, api_stats: dict = None) -> Optional[float]:
if not api_stats:
try:
api_stats = await self.api.stats()
@@ -273,10 +273,10 @@ class CGMinerAvalon(CGMiner):
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return round(float(parsed_stats["GHSmm"]) / 1000, 2)
except (LookupError, ValueError, TypeError):
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_env_temp(self, api_stats: dict = None) -> Optional[float]:
async def get_env_temp(self, api_stats: dict = None) -> Optional[float]:
if not api_stats:
try:
api_stats = await self.api.stats()
@@ -288,13 +288,13 @@ class CGMinerAvalon(CGMiner):
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return float(parsed_stats["Temp"])
except (LookupError, ValueError, TypeError):
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_wattage(self) -> Optional[int]:
async def get_wattage(self) -> Optional[int]:
return None
async def _get_wattage_limit(self, api_stats: dict = None) -> Optional[int]:
async def get_wattage_limit(self, api_stats: dict = None) -> Optional[int]:
if not api_stats:
try:
api_stats = await self.api.stats()
@@ -306,17 +306,17 @@ class CGMinerAvalon(CGMiner):
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return int(parsed_stats["MPO"])
except (LookupError, ValueError, TypeError):
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_fans(self, api_stats: dict = None) -> List[Fan]:
async def get_fans(self, api_stats: dict = None) -> List[Fan]:
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
fans_data = [Fan() for _ in range(self.expected_fans)]
fans_data = [Fan() for _ in range(self.fan_count)]
if api_stats:
try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"]
@@ -324,17 +324,17 @@ class CGMinerAvalon(CGMiner):
except LookupError:
return fans_data
for fan in range(self.expected_fans):
for fan in range(self.fan_count):
try:
fans_data[fan].speed = int(parsed_stats[f"Fan{fan + 1}"])
except (LookupError, ValueError, TypeError):
except (IndexError, KeyError, ValueError, TypeError):
pass
return fans_data
async def _get_errors(self) -> List[MinerErrorData]:
async def get_errors(self) -> List[MinerErrorData]:
return []
async def _get_fault_light(self, api_stats: dict = None) -> bool: # noqa
async def get_fault_light(self, api_stats: dict = None) -> bool: # noqa
if self.light:
return self.light
if not api_stats:
@@ -349,7 +349,7 @@ class CGMinerAvalon(CGMiner):
parsed_stats = self.parse_stats(unparsed_stats)
led = int(parsed_stats["Led"])
return True if led == 1 else False
except (LookupError, ValueError, TypeError):
except (IndexError, KeyError, ValueError, TypeError):
pass
try:
@@ -363,5 +363,8 @@ class CGMinerAvalon(CGMiner):
pass
return False
async def _is_mining(self, *args, **kwargs) -> Optional[bool]:
async def is_mining(self, *args, **kwargs) -> Optional[bool]:
return None
async def get_uptime(self) -> Optional[int]:
return None

View File

@@ -14,12 +14,73 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import List, Optional
from pyasic.data import HashBoard
from pyasic.miners.backends import BMMiner
from pyasic.miners.base import DataFunction, DataLocations, DataOptions, RPCAPICommand
HIVEON_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction("get_mac"),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"get_api_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.FW_VERSION): DataFunction(
"get_fw_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"get_hashrate", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"get_expected_hashrate", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.HASHBOARDS): DataFunction(
"get_hashboards", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"get_env_temp", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.WATTAGE): DataFunction(
"get_wattage", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.WATTAGE_LIMIT): DataFunction("get_wattage_limit"),
str(DataOptions.FANS): DataFunction(
"get_fans", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction("get_errors"),
str(DataOptions.FAULT_LIGHT): DataFunction("get_fault_light"),
str(DataOptions.IS_MINING): DataFunction("is_mining"),
str(DataOptions.UPTIME): DataFunction(
"get_uptime", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
)
class Hiveon(BMMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver)
self.pwd = "admin"
# static data
self.api_type = "Hiveon"
self.fw_str = "Hive"
# data gathering locations
self.data_locations = HIVEON_DATA_LOC
async def get_model(self) -> Optional[str]:
if self.model is not None:
return self.model + " (Hiveon)"
return "? (Hiveon)"
async def get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
pass
async def get_wattage(self, api_stats: dict = None) -> Optional[int]:
pass
async def get_env_temp(self, api_stats: dict = None) -> Optional[float]:
pass

View File

@@ -0,0 +1,404 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import List, Optional
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import MinerErrorData
from pyasic.data.error_codes.innosilicon import InnosiliconError
from pyasic.errors import APIError
from pyasic.miners.backends import CGMiner
from pyasic.miners.base import (
DataFunction,
DataLocations,
DataOptions,
RPCAPICommand,
WebAPICommand,
)
from pyasic.web.innosilicon import InnosiliconWebAPI
INNOSILICON_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"get_mac",
[
WebAPICommand("web_get_all", "getAll"),
WebAPICommand("web_overview", "overview"),
],
),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"get_api_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.FW_VERSION): DataFunction(
"get_fw_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"get_hashrate",
[
RPCAPICommand("api_summary", "summary"),
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"get_expected_hashrate",
),
str(DataOptions.HASHBOARDS): DataFunction(
"get_hashboards",
[
RPCAPICommand("api_stats", "stats"),
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction("get_env_temp"),
str(DataOptions.WATTAGE): DataFunction(
"get_wattage",
[
WebAPICommand("web_get_all", "getAll"),
RPCAPICommand("api_stats", "stats"),
],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"get_wattage_limit",
[
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.FANS): DataFunction(
"get_fans",
[
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction(
"get_errors",
[
WebAPICommand("web_get_error_detail", "getErrorDetail"),
],
),
str(DataOptions.FAULT_LIGHT): DataFunction("get_fault_light"),
str(DataOptions.IS_MINING): DataFunction("is_mining"),
str(DataOptions.UPTIME): DataFunction(
"get_uptime", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
)
class Innosilicon(CGMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
# interfaces
self.web = InnosiliconWebAPI(ip)
# static data
# data gathering locations
self.data_locations = INNOSILICON_DATA_LOC
# autotuning/shutdown support
self.supports_shutdown = True
# data storage
self.api_ver = api_ver
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self) -> MinerConfig:
# get pool data
try:
pools = await self.web.pools()
except APIError:
return self.config
self.config = MinerConfig.from_inno(pools)
return self.config
async def reboot(self) -> bool:
try:
data = await self.web.reboot()
except APIError:
pass
else:
return data["success"]
async def restart_cgminer(self) -> bool:
try:
data = await self.web.restart_cgminer()
except APIError:
pass
else:
return data["success"]
async def restart_backend(self) -> bool:
return await self.restart_cgminer()
async def stop_mining(self) -> bool:
return False
# data = await self.web.poweroff()
# try:
# return data["success"]
# except KeyError:
# return False
async def resume_mining(self) -> bool:
return False
# data = await self.web.restart_cgminer()
# print(data)
# try:
# return data["success"]
# except KeyError:
# return False
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
await self.web.update_pools(config.as_inno(user_suffix=user_suffix))
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(
self, web_get_all: dict = None, web_overview: dict = None
) -> Optional[str]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all and not web_overview:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_get_all:
try:
mac = web_get_all["mac"]
return mac.upper()
except KeyError:
pass
if web_overview:
try:
mac = web_overview["version"]["ethaddr"]
return mac.upper()
except KeyError:
pass
async def get_hashrate(
self, api_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
if web_get_all:
web_get_all = web_get_all["all"]
if not api_summary and not web_get_all:
try:
api_summary = await self.api.summary()
except APIError:
pass
if web_get_all:
try:
if "Hash Rate H" in web_get_all["total_hash"].keys():
return round(
float(web_get_all["total_hash"]["Hash Rate H"] / 1000000000000),
2,
)
elif "Hash Rate" in web_get_all["total_hash"].keys():
return round(
float(web_get_all["total_hash"]["Hash Rate"] / 1000000), 5
)
except KeyError:
pass
if api_summary:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except (KeyError, IndexError):
pass
async def get_hashboards(
self, api_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
if web_get_all:
web_get_all = web_get_all["all"]
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
idx = board["Chain ID"]
chips = board["Num active chips"]
except KeyError:
pass
else:
hashboards[idx].chips = chips
hashboards[idx].missing = False
if web_get_all:
if web_get_all.get("chain"):
for board in web_get_all["chain"]:
idx = board.get("ASC")
if idx is not None:
temp = board.get("Temp min")
if temp:
hashboards[idx].temp = round(temp)
hashrate = board.get("Hash Rate H")
if hashrate:
hashboards[idx].hashrate = round(
hashrate / 1000000000000, 2
)
chip_temp = board.get("Temp max")
if chip_temp:
hashboards[idx].chip_temp = round(chip_temp)
return hashboards
async def get_wattage(
self, web_get_all: dict = None, api_stats: dict = None
) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
return web_get_all["power"]
except KeyError:
pass
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
wattage = board["power"]
except KeyError:
pass
else:
wattage = int(wattage)
return wattage
async def get_fans(self, web_get_all: dict = None) -> List[Fan]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
fans = [Fan() for _ in range(self.fan_count)]
if web_get_all:
try:
spd = web_get_all["fansSpeed"]
except KeyError:
pass
else:
round((int(spd) * 6000) / 100)
for i in range(self.fan_count):
fans[i].speed = spd
return fans
async def get_errors(
self, web_get_error_detail: dict = None
) -> List[MinerErrorData]:
errors = []
if not web_get_error_detail:
try:
web_get_error_detail = await self.web.get_error_detail()
except APIError:
pass
if web_get_error_detail:
try:
# only 1 error?
# TODO: check if this should be a loop, can't remember.
err = web_get_error_detail["code"]
except KeyError:
pass
else:
err = int(err)
if not err == 0:
errors.append(InnosiliconError(error_code=err))
return errors
async def get_wattage_limit(self, web_get_all: dict = None) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
level = web_get_all["running_mode"]["level"]
except KeyError:
pass
else:
# this is very possibly not correct.
level = int(level)
limit = 1250 + (250 * level)
return limit
async def get_expected_hashrate(self) -> Optional[float]:
pass

View File

@@ -13,320 +13,10 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import logging
from typing import List, Optional
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import InnosiliconError, MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.backends import CGMiner
from pyasic.miners.backends.innosilicon import Innosilicon
from pyasic.miners.types import A10X
from pyasic.web.inno import InnosiliconWebAPI
class CGMinerA10X(CGMiner, A10X):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.web = InnosiliconWebAPI(ip)
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self, web_pools: dict = None) -> MinerConfig:
if not web_pools:
try:
web_pools = await self.web.pools()
except APIError as e:
logging.warning(e)
if web_pools:
if "pools" in web_pools.keys():
cfg = MinerConfig().from_raw(web_pools)
self.config = cfg
return self.config
async def reboot(self) -> bool:
try:
data = await self.web.reboot()
except APIError:
class InnosiliconA10X(Innosilicon, A10X):
pass
else:
return data["success"]
async def restart_cgminer(self) -> bool:
try:
data = await self.web.restart_cgminer()
except APIError:
pass
else:
return data["success"]
async def restart_backend(self) -> bool:
return await self.restart_cgminer()
async def stop_mining(self) -> bool:
return False
# data = await self.web.poweroff()
# try:
# return data["success"]
# except KeyError:
# return False
async def resume_mining(self) -> bool:
return False
# data = await self.web.restart_cgminer()
# print(data)
# try:
# return data["success"]
# except KeyError:
# return False
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
pass
# doesnt work for some reason
# self.config = config
# await self.web.update_pools(config.as_inno(user_suffix=user_suffix))
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(
self, web_get_all: dict = None, web_overview: dict = None
) -> Optional[str]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all and not web_overview:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_get_all:
try:
mac = web_get_all["mac"]
return mac.upper()
except KeyError:
pass
if web_overview:
try:
mac = web_overview["version"]["ethaddr"]
return mac.upper()
except KeyError:
pass
async def get_hashrate(
self, api_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
if web_get_all:
web_get_all = web_get_all["all"]
if not api_summary and not web_get_all:
try:
api_summary = await self.api.summary()
except APIError:
pass
if web_get_all:
try:
return round(float(web_get_all["total_hash"]["Hash Rate"] / 1000000), 5)
except KeyError:
pass
if api_summary:
try:
return round(
float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000000000), 5
)
except LookupError:
pass
async def get_hashboards(
self, api_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if web_get_all:
web_get_all = web_get_all["all"]
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
idx = board["Chain ID"]
chips = board["Num active chips"]
except KeyError:
pass
else:
hashboards[idx].chips = chips
hashboards[idx].missing = False
if web_get_all:
if web_get_all.get("chain"):
for board in web_get_all["chain"]:
idx = board.get("ASC")
if idx is not None:
temp = board.get("Temp min")
if temp:
hashboards[idx].temp = round(temp)
hashrate = board.get("Hash Rate H")
if hashrate:
hashboards[idx].hashrate = round(
hashrate / 1000000000000, 5
)
chip_temp = board.get("Temp max")
if chip_temp:
hashboards[idx].chip_temp = round(chip_temp)
return hashboards
async def get_wattage(
self, web_get_all: dict = None, api_stats: dict = None
) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
return web_get_all["power"]
except KeyError:
pass
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
wattage = board["power"]
except KeyError:
pass
else:
wattage = int(wattage)
return wattage
async def get_fans(self, web_get_all: dict = None) -> List[Fan]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
fans = [Fan() for _ in range(self.expected_fans)]
if web_get_all:
try:
spd = web_get_all["fansSpeed"]
except KeyError:
pass
else:
round((int(spd) * 6000) / 100)
for i in range(self.expected_fans):
fans[i].speed = spd
return fans
async def get_errors(
self, web_get_error_detail: dict = None
) -> List[MinerErrorData]: # noqa: named this way for automatic functionality
errors = []
if not web_get_error_detail:
try:
web_get_error_detail = await self.web.get_error_detail()
except APIError:
pass
if web_get_error_detail:
try:
# only 1 error?
# TODO: check if this should be a loop, can't remember.
err = web_get_error_detail["code"]
except KeyError:
pass
else:
err = int(err)
if not err == 0:
errors.append(InnosiliconError(error_code=err))
return errors
async def get_fw_ver(self, api_version: dict = None) -> Optional[str]:
if self.fw_ver:
return self.fw_ver
if not api_version:
try:
api_version = await self.api.version()
except APIError:
pass
if api_version:
try:
self.fw_ver = api_version["VERSION"][0]["CGMiner"].split("-")[-1:][0]
except LookupError:
pass
return self.fw_ver
async def get_wattage_limit(self, web_get_all: dict = None) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
level = web_get_all["running_mode"]["level"]
except KeyError:
pass
else:
# this is very possibly not correct.
level = int(level)
limit = 1250 + (250 * level)
return limit

View File

@@ -14,4 +14,4 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from .A10X import CGMinerA10X
from .A10X import InnosiliconA10X

View File

@@ -13,280 +13,10 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import List, Optional
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import InnosiliconError, MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.backends import CGMiner
from pyasic.miners.backends.innosilicon import Innosilicon
from pyasic.miners.types import T3HPlus
from pyasic.web.inno import InnosiliconWebAPI
class CGMinerT3HPlus(CGMiner, T3HPlus):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.web = InnosiliconWebAPI(ip)
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self, api_pools: dict = None) -> MinerConfig:
# get pool data
try:
pools = await self.api.pools()
except APIError:
return self.config
self.config = MinerConfig.from_api(pools)
return self.config
async def reboot(self) -> bool:
try:
data = await self.web.reboot()
except APIError:
class InnosiliconT3HPlus(Innosilicon, T3HPlus):
pass
else:
return data["success"]
async def restart_cgminer(self) -> bool:
try:
data = await self.web.restart_cgminer()
except APIError:
pass
else:
return data["success"]
async def restart_backend(self) -> bool:
return await self.restart_cgminer()
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
await self.web.update_pools(config.as_inno(user_suffix=user_suffix))
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(
self, web_get_all: dict = None, web_overview: dict = None
) -> Optional[str]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all and not web_overview:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_get_all:
try:
mac = web_get_all["mac"]
return mac.upper()
except KeyError:
pass
if web_overview:
try:
mac = web_overview["version"]["ethaddr"]
return mac.upper()
except KeyError:
pass
async def get_hashrate(
self, api_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
if web_get_all:
web_get_all = web_get_all["all"]
if not api_summary and not web_get_all:
try:
api_summary = await self.api.summary()
except APIError:
pass
if web_get_all:
try:
return round(
float(web_get_all["total_hash"]["Hash Rate H"] / 1000000000000), 2
)
except KeyError:
pass
if api_summary:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except LookupError:
pass
async def get_hashboards(
self, api_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
if web_get_all:
web_get_all = web_get_all["all"]
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
idx = board["Chain ID"]
chips = board["Num active chips"]
except KeyError:
pass
else:
hashboards[idx].chips = chips
hashboards[idx].missing = False
if web_get_all:
if web_get_all.get("chain"):
for board in web_get_all["chain"]:
idx = board.get("ASC")
if idx is not None:
temp = board.get("Temp min")
if temp:
hashboards[idx].temp = round(temp)
hashrate = board.get("Hash Rate H")
if hashrate:
hashboards[idx].hashrate = round(
hashrate / 1000000000000, 2
)
chip_temp = board.get("Temp max")
if chip_temp:
hashboards[idx].chip_temp = round(chip_temp)
return hashboards
async def get_wattage(
self, web_get_all: dict = None, api_stats: dict = None
) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
return web_get_all["power"]
except KeyError:
pass
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
wattage = board["power"]
except KeyError:
pass
else:
wattage = int(wattage)
return wattage
async def get_fans(self, web_get_all: dict = None) -> List[Fan]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
fans = [Fan() for _ in range(self.fan_count)]
if web_get_all:
try:
spd = web_get_all["fansSpeed"]
except KeyError:
pass
else:
round((int(spd) * 6000) / 100)
for i in range(self.fan_count):
fans[i].speed = spd
return fans
async def get_errors(
self, web_get_error_detail: dict = None
) -> List[MinerErrorData]: # noqa: named this way for automatic functionality
errors = []
if not web_get_error_detail:
try:
web_get_error_detail = await self.web.get_error_detail()
except APIError:
pass
if web_get_error_detail:
try:
# only 1 error?
# TODO: check if this should be a loop, can't remember.
err = web_get_error_detail["code"]
except KeyError:
pass
else:
err = int(err)
if not err == 0:
errors.append(InnosiliconError(error_code=err))
return errors
async def get_wattage_limit(self, web_get_all: dict = None) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
level = web_get_all["running_mode"]["level"]
except KeyError:
pass
else:
# this is very possibly not correct.
level = int(level)
limit = 1250 + (250 * level)
return limit

View File

@@ -14,4 +14,4 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from .T3H import CGMinerT3HPlus
from .T3H import InnosiliconT3HPlus

View File

@@ -326,8 +326,8 @@ MINER_CLASSES = {
},
MinerTypes.INNOSILICON: {
None: CGMiner,
"T3H+": CGMinerT3HPlus,
"A10X": CGMinerA10X,
"T3H+": InnosiliconT3HPlus,
"A10X": InnosiliconA10X,
},
MinerTypes.GOLDSHELL: {
None: BFGMiner,

View File

@@ -72,6 +72,31 @@ class MinersTest(unittest.TestCase):
)
self.assertEqual(miner_keys, keys)
def test_data_locations_match_signatures_command(self):
warnings.filterwarnings("ignore")
for miner_model in MINER_CLASSES.keys():
for miner_api in MINER_CLASSES[miner_model].keys():
miner = MINER_CLASSES[miner_model][miner_api]("127.0.0.1")
for data_point in asdict(miner.data_locations).values():
with self.subTest(
msg=f"Test {data_point['cmd']} signature matches with model={miner_model}, api={miner_api}",
miner_model=miner_model,
miner_api=miner_api,
):
func = getattr(miner, data_point["cmd"])
signature = inspect.signature(func)
parameters = signature.parameters
param_names = list(parameters.keys())
for arg in ["kwargs", "args"]:
try:
param_names.remove(arg)
except ValueError:
pass
self.assertEqual(
set(param_names),
set([k["name"] for k in data_point["kwargs"]]),
)
if __name__ == "__main__":
unittest.main()