bug: add test to cross check function arguments, and fix some method implementations and naming.

This commit is contained in:
UpstreamData
2024-01-15 10:16:47 -07:00
parent 3c3c34c54b
commit 8fb357544b
14 changed files with 655 additions and 713 deletions

View File

@@ -2,9 +2,8 @@
## A10X Models ## A10X Models
## A10X ## A10X
::: pyasic.miners.innosilicon.cgminer.A10X.A10X.CGMinerA10X ::: pyasic.miners.innosilicon.cgminer.A10X.A10X.InnosiliconA10X
handler: python handler: python
options: options:
show_root_heading: false show_root_heading: false
heading_level: 4 heading_level: 4

View File

@@ -2,9 +2,8 @@
## T3X Models ## T3X Models
## T3H+ ## T3H+
::: pyasic.miners.innosilicon.cgminer.T3X.T3H.CGMinerT3HPlus ::: pyasic.miners.innosilicon.cgminer.T3X.T3H.InnosiliconT3HPlus
handler: python handler: python
options: options:
show_root_heading: false show_root_heading: false
heading_level: 4 heading_level: 4

View File

@@ -25,10 +25,6 @@ from pyasic.miners.types import T9
class HiveonT9(Hiveon, T9): class HiveonT9(Hiveon, T9):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.pwd = "admin"
################################################## ##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ### ### DATA GATHERING FUNCTIONS (get_{some_data}) ###
@@ -46,34 +42,44 @@ class HiveonT9(Hiveon, T9):
pass pass
async def get_hashboards(self, api_stats: dict = None) -> List[HashBoard]: async def get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
hashboards = [
HashBoard(slot=board, expected_chips=self.expected_chips)
for board in range(self.expected_hashboards)
]
if api_stats is None:
try:
api_stats = self.api.stats()
except APIError:
return []
board_map = { board_map = {
0: [2, 9, 10], 0: [2, 9, 10],
1: [3, 11, 12], 1: [3, 11, 12],
2: [4, 13, 14], 2: [4, 13, 14],
} }
hashboards = []
for board in board_map: for board in board_map:
hashboard = HashBoard(slot=board, expected_chips=self.expected_chips)
hashrate = 0 hashrate = 0
chips = 0 chips = 0
for chipset in board_map[board]: for chipset in board_map[board]:
if hashboard.chip_temp == None: if hashboards[board].chip_temp is None:
try: try:
hashboard.board_temp = api_stats["STATS"][1][f"temp{chipset}"] hashboards[board].temp = api_stats["STATS"][1][f"temp{chipset}"]
hashboard.chip_temp = api_stats["STATS"][1][f"temp2_{chipset}"] hashboards[board].chip_temp = api_stats["STATS"][1][
except LookupError: f"temp2_{chipset}"
]
except (KeyError, IndexError):
pass pass
else: else:
hashboard.missing = False hashboards[board].missing = False
try: try:
hashrate += api_stats["STATS"][1][f"chain_rate{chipset}"] hashrate += api_stats["STATS"][1][f"chain_rate{chipset}"]
chips += api_stats["STATS"][1][f"chain_acn{chipset}"] chips += api_stats["STATS"][1][f"chain_acn{chipset}"]
except LookupError: except (KeyError, IndexError):
pass pass
hashboard.hashrate = round(hashrate / 1000, 2) hashboards[board].hashrate = round(hashrate / 1000, 2)
hashboard.chips = chips hashboards[board].chips = chips
hashboards.append(hashboard)
return hashboards return hashboards
@@ -88,7 +94,7 @@ class HiveonT9(Hiveon, T9):
boards = api_stats.get("STATS") boards = api_stats.get("STATS")
try: try:
wattage_raw = boards[1]["chain_power"] wattage_raw = boards[1]["chain_power"]
except LookupError: except (KeyError, IndexError):
pass pass
else: else:
# parse wattage position out of raw data # parse wattage position out of raw data
@@ -113,7 +119,7 @@ class HiveonT9(Hiveon, T9):
env_temp = api_stats["STATS"][1][f"temp3_{chipset}"] env_temp = api_stats["STATS"][1][f"temp3_{chipset}"]
if not env_temp == 0: if not env_temp == 0:
env_temp_list.append(int(env_temp)) env_temp_list.append(int(env_temp))
except LookupError: except (KeyError, IndexError):
pass pass
if not env_temp_list == []: if not env_temp_list == []:

View File

@@ -15,7 +15,8 @@
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
import logging import logging
from typing import List, Optional from collections import namedtuple
from typing import List, Optional, Tuple
from pyasic.API.btminer import BTMinerAPI from pyasic.API.btminer import BTMinerAPI
from pyasic.config import MinerConfig, MiningModeConfig from pyasic.config import MinerConfig, MiningModeConfig
@@ -28,74 +29,80 @@ from pyasic.miners.base import (
DataLocations, DataLocations,
DataOptions, DataOptions,
RPCAPICommand, RPCAPICommand,
WebAPICommand,
) )
BTMINER_DATA_LOC = DataLocations( BTMINER_DATA_LOC = DataLocations(
**{ **{
str(DataOptions.MAC): DataFunction( str(DataOptions.MAC): DataFunction(
"_get_mac", "get_mac",
[ [
RPCAPICommand("api_summary", "summary"), RPCAPICommand("api_summary", "summary"),
RPCAPICommand("api_get_miner_info", "get_miner_info"), RPCAPICommand("api_get_miner_info", "get_miner_info"),
], ],
), ),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction( str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver", [RPCAPICommand("api_get_version", "get_version")] "get_api_ver", [RPCAPICommand("api_get_version", "get_version")]
), ),
str(DataOptions.FW_VERSION): DataFunction( str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver", "get_fw_ver",
[ [
RPCAPICommand("api_get_version", "get_version"), RPCAPICommand("api_get_version", "get_version"),
RPCAPICommand("api_summary", "summary"), RPCAPICommand("api_summary", "summary"),
], ],
), ),
str(DataOptions.HOSTNAME): DataFunction( str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname", [RPCAPICommand("api_get_miner_info", "get_miner_info")] "get_hostname", [RPCAPICommand("api_get_miner_info", "get_miner_info")]
), ),
str(DataOptions.HASHRATE): DataFunction( str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate", [RPCAPICommand("api_summary", "summary")] "get_hashrate", [RPCAPICommand("api_summary", "summary")]
), ),
str(DataOptions.EXPECTED_HASHRATE): DataFunction( str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate", [RPCAPICommand("api_summary", "summary")] "get_expected_hashrate", [RPCAPICommand("api_summary", "summary")]
), ),
str(DataOptions.HASHBOARDS): DataFunction( str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards", [RPCAPICommand("api_devs", "devs")] "get_hashboards", [RPCAPICommand("api_devs", "devs")]
), ),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction( str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp", [RPCAPICommand("api_summary", "summary")] "get_env_temp", [RPCAPICommand("api_summary", "summary")]
), ),
str(DataOptions.WATTAGE): DataFunction( str(DataOptions.WATTAGE): DataFunction(
"_get_wattage", [RPCAPICommand("api_summary", "summary")] "get_wattage", [RPCAPICommand("api_summary", "summary")]
), ),
str(DataOptions.WATTAGE_LIMIT): DataFunction( str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit", [RPCAPICommand("api_summary", "summary")] "get_wattage_limit", [RPCAPICommand("api_summary", "summary")]
), ),
str(DataOptions.FANS): DataFunction( str(DataOptions.FANS): DataFunction(
"_get_fans", "get_fans",
[ [
RPCAPICommand("api_summary", "summary"), RPCAPICommand("api_summary", "summary"),
RPCAPICommand("api_get_psu", "get_psu"), RPCAPICommand("api_get_psu", "get_psu"),
], ],
), ),
str(DataOptions.FAN_PSU): DataFunction( str(DataOptions.FAN_PSU): DataFunction(
"_get_fan_psu", "get_fan_psu",
[ [
RPCAPICommand("api_summary", "summary"), RPCAPICommand("api_summary", "summary"),
RPCAPICommand("api_get_psu", "get_psu"), RPCAPICommand("api_get_psu", "get_psu"),
], ],
), ),
str(DataOptions.ERRORS): DataFunction( str(DataOptions.ERRORS): DataFunction(
"_get_errors", [RPCAPICommand("api_get_error_code", "get_error_code")] "get_errors",
[
RPCAPICommand("api_get_error_code", "get_error_code"),
RPCAPICommand("api_summary", "summary"),
],
), ),
str(DataOptions.FAULT_LIGHT): DataFunction( str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light", "get_fault_light",
[RPCAPICommand("api_get_miner_info", "get_miner_info")], [RPCAPICommand("api_get_miner_info", "get_miner_info")],
), ),
str(DataOptions.IS_MINING): DataFunction( str(DataOptions.IS_MINING): DataFunction(
"_is_mining", [RPCAPICommand("api_status", "status")] "is_mining", [RPCAPICommand("api_status", "status")]
), ),
str(DataOptions.UPTIME): DataFunction( str(DataOptions.UPTIME): DataFunction(
"_get_uptime", [RPCAPICommand("api_summary", "summary")] "get_uptime", [RPCAPICommand("api_summary", "summary")]
), ),
str(DataOptions.CONFIG): DataFunction("get_config"), str(DataOptions.CONFIG): DataFunction("get_config"),
} }
@@ -236,7 +243,7 @@ class BTMiner(BaseMiner):
else: else:
cfg = MinerConfig() cfg = MinerConfig()
is_mining = await self._is_mining(status) is_mining = await self.is_mining(status)
if not is_mining: if not is_mining:
cfg.mining_mode = MiningModeConfig.sleep() cfg.mining_mode = MiningModeConfig.sleep()
return cfg return cfg
@@ -280,7 +287,7 @@ class BTMiner(BaseMiner):
### DATA GATHERING FUNCTIONS (get_{some_data}) ### ### DATA GATHERING FUNCTIONS (get_{some_data}) ###
################################################## ##################################################
async def _get_mac( async def get_mac(
self, api_summary: dict = None, api_get_miner_info: dict = None self, api_summary: dict = None, api_get_miner_info: dict = None
) -> Optional[str]: ) -> Optional[str]:
if not api_get_miner_info: if not api_get_miner_info:
@@ -306,10 +313,24 @@ class BTMiner(BaseMiner):
try: try:
mac = api_summary["SUMMARY"][0]["MAC"] mac = api_summary["SUMMARY"][0]["MAC"]
return str(mac).upper() return str(mac).upper()
except LookupError: except (KeyError, IndexError):
pass pass
async def _get_api_ver(self, api_get_version: dict = None) -> Optional[str]: async def get_version(
self, api_get_version: dict = None, api_summary: dict = None
) -> Tuple[Optional[str], Optional[str]]:
miner_version = namedtuple("MinerVersion", "api_ver fw_ver")
api_ver = await self.get_api_ver(api_get_version=api_get_version)
fw_ver = await self.get_fw_ver(
api_get_version=api_get_version, api_summary=api_summary
)
return miner_version(api_ver, fw_ver)
async def get_api_ver(self, api_get_version: dict = None) -> Optional[str]:
# Check to see if the version info is already cached
if self.api_ver:
return self.api_ver
if not api_get_version: if not api_get_version:
try: try:
api_get_version = await self.api.get_version() api_get_version = await self.api.get_version()
@@ -332,9 +353,13 @@ class BTMiner(BaseMiner):
return self.api_ver return self.api_ver
async def _get_fw_ver( async def get_fw_ver(
self, api_get_version: dict = None, api_summary: dict = None self, api_get_version: dict = None, api_summary: dict = None
) -> Optional[str]: ) -> Optional[str]:
# Check to see if the version info is already cached
if self.fw_ver:
return self.fw_ver
if not api_get_version: if not api_get_version:
try: try:
api_get_version = await self.api.get_version() api_get_version = await self.api.get_version()
@@ -362,12 +387,12 @@ class BTMiner(BaseMiner):
self.fw_ver = api_summary["SUMMARY"][0]["Firmware Version"].replace( self.fw_ver = api_summary["SUMMARY"][0]["Firmware Version"].replace(
"'", "" "'", ""
) )
except LookupError: except (KeyError, IndexError):
pass pass
return self.fw_ver return self.fw_ver
async def _get_hostname(self, api_get_miner_info: dict = None) -> Optional[str]: async def get_hostname(self, api_get_miner_info: dict = None) -> Optional[str]:
hostname = None hostname = None
if not api_get_miner_info: if not api_get_miner_info:
try: try:
@@ -383,7 +408,7 @@ class BTMiner(BaseMiner):
return hostname return hostname
async def _get_hashrate(self, api_summary: dict = None) -> Optional[float]: async def get_hashrate(self, api_summary: dict = None) -> Optional[float]:
# get hr from API # get hr from API
if not api_summary: if not api_summary:
try: try:
@@ -394,10 +419,10 @@ class BTMiner(BaseMiner):
if api_summary: if api_summary:
try: try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2) return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except LookupError: except (KeyError, IndexError):
pass pass
async def _get_hashboards(self, api_devs: dict = None) -> List[HashBoard]: async def get_hashboards(self, api_devs: dict = None) -> List[HashBoard]:
hashboards = [ hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips) HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards) for i in range(self.expected_hashboards)
@@ -427,12 +452,12 @@ class BTMiner(BaseMiner):
hashboards[board["ASC"]].chips = board["Effective Chips"] hashboards[board["ASC"]].chips = board["Effective Chips"]
hashboards[board["ASC"]].serial_number = board["PCB SN"] hashboards[board["ASC"]].serial_number = board["PCB SN"]
hashboards[board["ASC"]].missing = False hashboards[board["ASC"]].missing = False
except LookupError: except (KeyError, IndexError):
pass pass
return hashboards return hashboards
async def _get_env_temp(self, api_summary: dict = None) -> Optional[float]: async def get_env_temp(self, api_summary: dict = None) -> Optional[float]:
if not api_summary: if not api_summary:
try: try:
api_summary = await self.api.summary() api_summary = await self.api.summary()
@@ -442,10 +467,10 @@ class BTMiner(BaseMiner):
if api_summary: if api_summary:
try: try:
return api_summary["SUMMARY"][0]["Env Temp"] return api_summary["SUMMARY"][0]["Env Temp"]
except LookupError: except (KeyError, IndexError):
pass pass
async def _get_wattage(self, api_summary: dict = None) -> Optional[int]: async def get_wattage(self, api_summary: dict = None) -> Optional[int]:
if not api_summary: if not api_summary:
try: try:
api_summary = await self.api.summary() api_summary = await self.api.summary()
@@ -456,10 +481,10 @@ class BTMiner(BaseMiner):
try: try:
wattage = api_summary["SUMMARY"][0]["Power"] wattage = api_summary["SUMMARY"][0]["Power"]
return wattage if not wattage == -1 else None return wattage if not wattage == -1 else None
except LookupError: except (KeyError, IndexError):
pass pass
async def _get_wattage_limit(self, api_summary: dict = None) -> Optional[int]: async def get_wattage_limit(self, api_summary: dict = None) -> Optional[int]:
if not api_summary: if not api_summary:
try: try:
api_summary = await self.api.summary() api_summary = await self.api.summary()
@@ -469,10 +494,10 @@ class BTMiner(BaseMiner):
if api_summary: if api_summary:
try: try:
return api_summary["SUMMARY"][0]["Power Limit"] return api_summary["SUMMARY"][0]["Power Limit"]
except LookupError: except (KeyError, IndexError):
pass pass
async def _get_fans( async def get_fans(
self, api_summary: dict = None, api_get_psu: dict = None self, api_summary: dict = None, api_get_psu: dict = None
) -> List[Fan]: ) -> List[Fan]:
if not api_summary: if not api_summary:
@@ -481,20 +506,20 @@ class BTMiner(BaseMiner):
except APIError: except APIError:
pass pass
fans = [Fan() for _ in range(self.expected_fans)] fans = [Fan() for _ in range(self.fan_count)]
if api_summary: if api_summary:
try: try:
if self.expected_fans > 0: if self.fan_count > 0:
fans = [ fans = [
Fan(api_summary["SUMMARY"][0].get("Fan Speed In", 0)), Fan(api_summary["SUMMARY"][0].get("Fan Speed In", 0)),
Fan(api_summary["SUMMARY"][0].get("Fan Speed Out", 0)), Fan(api_summary["SUMMARY"][0].get("Fan Speed Out", 0)),
] ]
except LookupError: except (KeyError, IndexError):
pass pass
return fans return fans
async def _get_fan_psu( async def get_fan_psu(
self, api_summary: dict = None, api_get_psu: dict = None self, api_summary: dict = None, api_get_psu: dict = None
) -> Optional[int]: ) -> Optional[int]:
if not api_summary: if not api_summary:
@@ -506,7 +531,7 @@ class BTMiner(BaseMiner):
if api_summary: if api_summary:
try: try:
return int(api_summary["SUMMARY"][0]["Power Fanspeed"]) return int(api_summary["SUMMARY"][0]["Power Fanspeed"])
except LookupError: except (KeyError, IndexError):
pass pass
if not api_get_psu: if not api_get_psu:
@@ -521,26 +546,11 @@ class BTMiner(BaseMiner):
except (KeyError, TypeError): except (KeyError, TypeError):
pass pass
async def _get_errors( async def get_errors(
self, api_summary: dict = None, api_get_error_code: dict = None self, api_summary: dict = None, api_get_error_code: dict = None
) -> List[MinerErrorData]: ) -> List[MinerErrorData]:
errors = [] errors = []
if not api_summary and not api_get_error_code: if not api_get_error_code and not api_summary:
try:
api_summary = await self.api.summary()
except APIError:
pass
if api_summary:
try:
for i in range(api_summary["SUMMARY"][0]["Error Code Count"]):
err = api_summary["SUMMARY"][0].get(f"Error Code {i}")
if err:
errors.append(WhatsminerError(error_code=err))
except (LookupError, ValueError, TypeError):
pass
if not api_get_error_code:
try: try:
api_get_error_code = await self.api.get_error_code() api_get_error_code = await self.api.get_error_code()
except APIError: except APIError:
@@ -554,9 +564,24 @@ class BTMiner(BaseMiner):
else: else:
errors.append(WhatsminerError(error_code=int(err))) errors.append(WhatsminerError(error_code=int(err)))
if not api_summary:
try:
api_summary = await self.api.summary()
except APIError:
pass
if api_summary:
try:
for i in range(api_summary["SUMMARY"][0]["Error Code Count"]):
err = api_summary["SUMMARY"][0].get(f"Error Code {i}")
if err:
errors.append(WhatsminerError(error_code=err))
except (KeyError, IndexError, ValueError, TypeError):
pass
return errors return errors
async def _get_expected_hashrate(self, api_summary: dict = None): async def get_expected_hashrate(self, api_summary: dict = None):
if not api_summary: if not api_summary:
try: try:
api_summary = await self.api.summary() api_summary = await self.api.summary()
@@ -568,10 +593,10 @@ class BTMiner(BaseMiner):
expected_hashrate = api_summary["SUMMARY"][0]["Factory GHS"] expected_hashrate = api_summary["SUMMARY"][0]["Factory GHS"]
if expected_hashrate: if expected_hashrate:
return round(expected_hashrate / 1000, 2) return round(expected_hashrate / 1000, 2)
except LookupError: except (KeyError, IndexError):
pass pass
async def _get_fault_light(self, api_get_miner_info: dict = None) -> bool: async def get_fault_light(self, api_get_miner_info: dict = None) -> bool:
if not api_get_miner_info: if not api_get_miner_info:
try: try:
api_get_miner_info = await self.api.get_miner_info() api_get_miner_info = await self.api.get_miner_info()
@@ -609,7 +634,7 @@ class BTMiner(BaseMiner):
async def set_hostname(self, hostname: str): async def set_hostname(self, hostname: str):
await self.api.set_hostname(hostname) await self.api.set_hostname(hostname)
async def _is_mining(self, api_status: dict = None) -> Optional[bool]: async def is_mining(self, api_status: dict = None) -> Optional[bool]:
if not api_status: if not api_status:
try: try:
api_status = await self.api.status() api_status = await self.api.status()
@@ -628,7 +653,7 @@ class BTMiner(BaseMiner):
except LookupError: except LookupError:
pass pass
async def _get_uptime(self, api_summary: dict = None) -> Optional[int]: async def get_uptime(self, api_summary: dict = None) -> Optional[int]:
if not api_summary: if not api_summary:
try: try:
api_summary = await self.api.summary() api_summary = await self.api.summary()

View File

@@ -14,6 +14,7 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
import logging
import re import re
from typing import List, Optional from typing import List, Optional
@@ -27,43 +28,42 @@ from pyasic.miners.base import DataFunction, DataLocations, DataOptions, RPCAPIC
AVALON_DATA_LOC = DataLocations( AVALON_DATA_LOC = DataLocations(
**{ **{
str(DataOptions.MAC): DataFunction( str(DataOptions.MAC): DataFunction(
"_get_mac", [RPCAPICommand("api_version", "version")] "get_mac", [RPCAPICommand("api_version", "version")]
), ),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction( str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver", [RPCAPICommand("api_version", "version")] "get_api_ver", [RPCAPICommand("api_version", "version")]
), ),
str(DataOptions.FW_VERSION): DataFunction( str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver", [RPCAPICommand("api_version", "version")] "get_fw_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname", [RPCAPICommand("api_version", "version")]
), ),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction( str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate", [RPCAPICommand("api_devs", "devs")] "get_hashrate", [RPCAPICommand("api_devs", "devs")]
), ),
str(DataOptions.EXPECTED_HASHRATE): DataFunction( str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate", [RPCAPICommand("api_stats", "stats")] "get_expected_hashrate", [RPCAPICommand("api_stats", "stats")]
), ),
str(DataOptions.HASHBOARDS): DataFunction( str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards", [RPCAPICommand("api_stats", "stats")] "get_hashboards", [RPCAPICommand("api_stats", "stats")]
), ),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction( str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp", [RPCAPICommand("api_stats", "stats")] "get_env_temp", [RPCAPICommand("api_stats", "stats")]
), ),
str(DataOptions.WATTAGE): DataFunction("_get_wattage"), str(DataOptions.WATTAGE): DataFunction("get_wattage"),
str(DataOptions.WATTAGE_LIMIT): DataFunction( str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit", [RPCAPICommand("api_stats", "stats")] "get_wattage_limit", [RPCAPICommand("api_stats", "stats")]
), ),
str(DataOptions.FANS): DataFunction( str(DataOptions.FANS): DataFunction(
"_get_fans", [RPCAPICommand("api_stats", "stats")] "get_fans", [RPCAPICommand("api_stats", "stats")]
), ),
str(DataOptions.FAN_PSU): DataFunction("_get_fan_psu"), str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction("_get_errors"), str(DataOptions.ERRORS): DataFunction("get_errors"),
str(DataOptions.FAULT_LIGHT): DataFunction( str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light", [RPCAPICommand("api_stats", "stats")] "get_fault_light", [RPCAPICommand("api_stats", "stats")]
), ),
str(DataOptions.IS_MINING): DataFunction("_is_mining"), str(DataOptions.IS_MINING): DataFunction("is_mining"),
str(DataOptions.UPTIME): DataFunction("_get_uptime"), str(DataOptions.UPTIME): DataFunction("get_uptime"),
str(DataOptions.CONFIG): DataFunction("get_config"), str(DataOptions.CONFIG): DataFunction("get_config"),
} }
) )
@@ -174,7 +174,7 @@ class CGMinerAvalon(CGMiner):
### DATA GATHERING FUNCTIONS (get_{some_data}) ### ### DATA GATHERING FUNCTIONS (get_{some_data}) ###
################################################## ##################################################
async def _get_mac(self, api_version: dict = None) -> Optional[str]: async def get_mac(self, api_version: dict = None) -> Optional[str]:
if not api_version: if not api_version:
try: try:
api_version = await self.api.version() api_version = await self.api.version()
@@ -192,7 +192,7 @@ class CGMinerAvalon(CGMiner):
except (KeyError, ValueError): except (KeyError, ValueError):
pass pass
async def _get_hostname(self, mac: str = None) -> Optional[str]: async def get_hostname(self) -> Optional[str]:
return None return None
# if not mac: # if not mac:
# mac = await self.get_mac() # mac = await self.get_mac()
@@ -200,7 +200,7 @@ class CGMinerAvalon(CGMiner):
# if mac: # if mac:
# return f"Avalon{mac.replace(':', '')[-6:]}" # return f"Avalon{mac.replace(':', '')[-6:]}"
async def _get_hashrate(self, api_devs: dict = None) -> Optional[float]: async def get_hashrate(self, api_devs: dict = None) -> Optional[float]:
if not api_devs: if not api_devs:
try: try:
api_devs = await self.api.devs() api_devs = await self.api.devs()
@@ -210,10 +210,10 @@ class CGMinerAvalon(CGMiner):
if api_devs: if api_devs:
try: try:
return round(float(api_devs["DEVS"][0]["MHS 1m"] / 1000000), 2) return round(float(api_devs["DEVS"][0]["MHS 1m"] / 1000000), 2)
except (LookupError, ValueError, TypeError): except (KeyError, IndexError, ValueError, TypeError):
pass pass
async def _get_hashboards(self, api_stats: dict = None) -> List[HashBoard]: async def get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
hashboards = [ hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips) HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards) for i in range(self.expected_hashboards)
@@ -229,7 +229,7 @@ class CGMinerAvalon(CGMiner):
try: try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"] unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_stats = self.parse_stats(unparsed_stats)
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
return hashboards return hashboards
for board in range(self.expected_hashboards): for board in range(self.expected_hashboards):
@@ -261,7 +261,7 @@ class CGMinerAvalon(CGMiner):
return hashboards return hashboards
async def _get_expected_hashrate(self, api_stats: dict = None) -> Optional[float]: async def get_expected_hashrate(self, api_stats: dict = None) -> Optional[float]:
if not api_stats: if not api_stats:
try: try:
api_stats = await self.api.stats() api_stats = await self.api.stats()
@@ -273,10 +273,10 @@ class CGMinerAvalon(CGMiner):
unparsed_stats = api_stats["STATS"][0]["MM ID0"] unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_stats = self.parse_stats(unparsed_stats)
return round(float(parsed_stats["GHSmm"]) / 1000, 2) return round(float(parsed_stats["GHSmm"]) / 1000, 2)
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_env_temp(self, api_stats: dict = None) -> Optional[float]: async def get_env_temp(self, api_stats: dict = None) -> Optional[float]:
if not api_stats: if not api_stats:
try: try:
api_stats = await self.api.stats() api_stats = await self.api.stats()
@@ -288,13 +288,13 @@ class CGMinerAvalon(CGMiner):
unparsed_stats = api_stats["STATS"][0]["MM ID0"] unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_stats = self.parse_stats(unparsed_stats)
return float(parsed_stats["Temp"]) return float(parsed_stats["Temp"])
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_wattage(self) -> Optional[int]: async def get_wattage(self) -> Optional[int]:
return None return None
async def _get_wattage_limit(self, api_stats: dict = None) -> Optional[int]: async def get_wattage_limit(self, api_stats: dict = None) -> Optional[int]:
if not api_stats: if not api_stats:
try: try:
api_stats = await self.api.stats() api_stats = await self.api.stats()
@@ -306,17 +306,17 @@ class CGMinerAvalon(CGMiner):
unparsed_stats = api_stats["STATS"][0]["MM ID0"] unparsed_stats = api_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_stats = self.parse_stats(unparsed_stats)
return int(parsed_stats["MPO"]) return int(parsed_stats["MPO"])
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_fans(self, api_stats: dict = None) -> List[Fan]: async def get_fans(self, api_stats: dict = None) -> List[Fan]:
if not api_stats: if not api_stats:
try: try:
api_stats = await self.api.stats() api_stats = await self.api.stats()
except APIError: except APIError:
pass pass
fans_data = [Fan() for _ in range(self.expected_fans)] fans_data = [Fan() for _ in range(self.fan_count)]
if api_stats: if api_stats:
try: try:
unparsed_stats = api_stats["STATS"][0]["MM ID0"] unparsed_stats = api_stats["STATS"][0]["MM ID0"]
@@ -324,17 +324,17 @@ class CGMinerAvalon(CGMiner):
except LookupError: except LookupError:
return fans_data return fans_data
for fan in range(self.expected_fans): for fan in range(self.fan_count):
try: try:
fans_data[fan].speed = int(parsed_stats[f"Fan{fan + 1}"]) fans_data[fan].speed = int(parsed_stats[f"Fan{fan + 1}"])
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
return fans_data return fans_data
async def _get_errors(self) -> List[MinerErrorData]: async def get_errors(self) -> List[MinerErrorData]:
return [] return []
async def _get_fault_light(self, api_stats: dict = None) -> bool: # noqa async def get_fault_light(self, api_stats: dict = None) -> bool: # noqa
if self.light: if self.light:
return self.light return self.light
if not api_stats: if not api_stats:
@@ -349,7 +349,7 @@ class CGMinerAvalon(CGMiner):
parsed_stats = self.parse_stats(unparsed_stats) parsed_stats = self.parse_stats(unparsed_stats)
led = int(parsed_stats["Led"]) led = int(parsed_stats["Led"])
return True if led == 1 else False return True if led == 1 else False
except (LookupError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
try: try:
@@ -363,5 +363,8 @@ class CGMinerAvalon(CGMiner):
pass pass
return False return False
async def _is_mining(self, *args, **kwargs) -> Optional[bool]: async def is_mining(self, *args, **kwargs) -> Optional[bool]:
return None
async def get_uptime(self) -> Optional[int]:
return None return None

View File

@@ -14,12 +14,73 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from typing import List, Optional
from pyasic.data import HashBoard
from pyasic.miners.backends import BMMiner from pyasic.miners.backends import BMMiner
from pyasic.miners.base import DataFunction, DataLocations, DataOptions, RPCAPICommand
HIVEON_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction("get_mac"),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"get_api_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.FW_VERSION): DataFunction(
"get_fw_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"get_hashrate", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"get_expected_hashrate", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.HASHBOARDS): DataFunction(
"get_hashboards", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"get_env_temp", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.WATTAGE): DataFunction(
"get_wattage", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.WATTAGE_LIMIT): DataFunction("get_wattage_limit"),
str(DataOptions.FANS): DataFunction(
"get_fans", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction("get_errors"),
str(DataOptions.FAULT_LIGHT): DataFunction("get_fault_light"),
str(DataOptions.IS_MINING): DataFunction("is_mining"),
str(DataOptions.UPTIME): DataFunction(
"get_uptime", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
)
class Hiveon(BMMiner): class Hiveon(BMMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None: def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver) super().__init__(ip, api_ver)
self.pwd = "admin"
# static data # static data
self.api_type = "Hiveon" self.api_type = "Hiveon"
self.fw_str = "Hive" # data gathering locations
self.data_locations = HIVEON_DATA_LOC
async def get_model(self) -> Optional[str]:
if self.model is not None:
return self.model + " (Hiveon)"
return "? (Hiveon)"
async def get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
pass
async def get_wattage(self, api_stats: dict = None) -> Optional[int]:
pass
async def get_env_temp(self, api_stats: dict = None) -> Optional[float]:
pass

View File

@@ -0,0 +1,404 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import List, Optional
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import MinerErrorData
from pyasic.data.error_codes.innosilicon import InnosiliconError
from pyasic.errors import APIError
from pyasic.miners.backends import CGMiner
from pyasic.miners.base import (
DataFunction,
DataLocations,
DataOptions,
RPCAPICommand,
WebAPICommand,
)
from pyasic.web.innosilicon import InnosiliconWebAPI
INNOSILICON_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"get_mac",
[
WebAPICommand("web_get_all", "getAll"),
WebAPICommand("web_overview", "overview"),
],
),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"get_api_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.FW_VERSION): DataFunction(
"get_fw_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"get_hashrate",
[
RPCAPICommand("api_summary", "summary"),
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"get_expected_hashrate",
),
str(DataOptions.HASHBOARDS): DataFunction(
"get_hashboards",
[
RPCAPICommand("api_stats", "stats"),
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction("get_env_temp"),
str(DataOptions.WATTAGE): DataFunction(
"get_wattage",
[
WebAPICommand("web_get_all", "getAll"),
RPCAPICommand("api_stats", "stats"),
],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"get_wattage_limit",
[
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.FANS): DataFunction(
"get_fans",
[
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction(
"get_errors",
[
WebAPICommand("web_get_error_detail", "getErrorDetail"),
],
),
str(DataOptions.FAULT_LIGHT): DataFunction("get_fault_light"),
str(DataOptions.IS_MINING): DataFunction("is_mining"),
str(DataOptions.UPTIME): DataFunction(
"get_uptime", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
)
class Innosilicon(CGMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
# interfaces
self.web = InnosiliconWebAPI(ip)
# static data
# data gathering locations
self.data_locations = INNOSILICON_DATA_LOC
# autotuning/shutdown support
self.supports_shutdown = True
# data storage
self.api_ver = api_ver
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self) -> MinerConfig:
# get pool data
try:
pools = await self.web.pools()
except APIError:
return self.config
self.config = MinerConfig.from_inno(pools)
return self.config
async def reboot(self) -> bool:
try:
data = await self.web.reboot()
except APIError:
pass
else:
return data["success"]
async def restart_cgminer(self) -> bool:
try:
data = await self.web.restart_cgminer()
except APIError:
pass
else:
return data["success"]
async def restart_backend(self) -> bool:
return await self.restart_cgminer()
async def stop_mining(self) -> bool:
return False
# data = await self.web.poweroff()
# try:
# return data["success"]
# except KeyError:
# return False
async def resume_mining(self) -> bool:
return False
# data = await self.web.restart_cgminer()
# print(data)
# try:
# return data["success"]
# except KeyError:
# return False
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
await self.web.update_pools(config.as_inno(user_suffix=user_suffix))
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(
self, web_get_all: dict = None, web_overview: dict = None
) -> Optional[str]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all and not web_overview:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_get_all:
try:
mac = web_get_all["mac"]
return mac.upper()
except KeyError:
pass
if web_overview:
try:
mac = web_overview["version"]["ethaddr"]
return mac.upper()
except KeyError:
pass
async def get_hashrate(
self, api_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
if web_get_all:
web_get_all = web_get_all["all"]
if not api_summary and not web_get_all:
try:
api_summary = await self.api.summary()
except APIError:
pass
if web_get_all:
try:
if "Hash Rate H" in web_get_all["total_hash"].keys():
return round(
float(web_get_all["total_hash"]["Hash Rate H"] / 1000000000000),
2,
)
elif "Hash Rate" in web_get_all["total_hash"].keys():
return round(
float(web_get_all["total_hash"]["Hash Rate"] / 1000000), 5
)
except KeyError:
pass
if api_summary:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except (KeyError, IndexError):
pass
async def get_hashboards(
self, api_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
if web_get_all:
web_get_all = web_get_all["all"]
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
idx = board["Chain ID"]
chips = board["Num active chips"]
except KeyError:
pass
else:
hashboards[idx].chips = chips
hashboards[idx].missing = False
if web_get_all:
if web_get_all.get("chain"):
for board in web_get_all["chain"]:
idx = board.get("ASC")
if idx is not None:
temp = board.get("Temp min")
if temp:
hashboards[idx].temp = round(temp)
hashrate = board.get("Hash Rate H")
if hashrate:
hashboards[idx].hashrate = round(
hashrate / 1000000000000, 2
)
chip_temp = board.get("Temp max")
if chip_temp:
hashboards[idx].chip_temp = round(chip_temp)
return hashboards
async def get_wattage(
self, web_get_all: dict = None, api_stats: dict = None
) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
return web_get_all["power"]
except KeyError:
pass
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
wattage = board["power"]
except KeyError:
pass
else:
wattage = int(wattage)
return wattage
async def get_fans(self, web_get_all: dict = None) -> List[Fan]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
fans = [Fan() for _ in range(self.fan_count)]
if web_get_all:
try:
spd = web_get_all["fansSpeed"]
except KeyError:
pass
else:
round((int(spd) * 6000) / 100)
for i in range(self.fan_count):
fans[i].speed = spd
return fans
async def get_errors(
self, web_get_error_detail: dict = None
) -> List[MinerErrorData]:
errors = []
if not web_get_error_detail:
try:
web_get_error_detail = await self.web.get_error_detail()
except APIError:
pass
if web_get_error_detail:
try:
# only 1 error?
# TODO: check if this should be a loop, can't remember.
err = web_get_error_detail["code"]
except KeyError:
pass
else:
err = int(err)
if not err == 0:
errors.append(InnosiliconError(error_code=err))
return errors
async def get_wattage_limit(self, web_get_all: dict = None) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
level = web_get_all["running_mode"]["level"]
except KeyError:
pass
else:
# this is very possibly not correct.
level = int(level)
limit = 1250 + (250 * level)
return limit
async def get_expected_hashrate(self) -> Optional[float]:
pass

View File

@@ -13,320 +13,10 @@
# See the License for the specific language governing permissions and - # See the License for the specific language governing permissions and -
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
import logging
from typing import List, Optional
from pyasic.config import MinerConfig from pyasic.miners.backends.innosilicon import Innosilicon
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import InnosiliconError, MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.backends import CGMiner
from pyasic.miners.types import A10X from pyasic.miners.types import A10X
from pyasic.web.inno import InnosiliconWebAPI
class CGMinerA10X(CGMiner, A10X): class InnosiliconA10X(Innosilicon, A10X):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None: pass
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.web = InnosiliconWebAPI(ip)
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self, web_pools: dict = None) -> MinerConfig:
if not web_pools:
try:
web_pools = await self.web.pools()
except APIError as e:
logging.warning(e)
if web_pools:
if "pools" in web_pools.keys():
cfg = MinerConfig().from_raw(web_pools)
self.config = cfg
return self.config
async def reboot(self) -> bool:
try:
data = await self.web.reboot()
except APIError:
pass
else:
return data["success"]
async def restart_cgminer(self) -> bool:
try:
data = await self.web.restart_cgminer()
except APIError:
pass
else:
return data["success"]
async def restart_backend(self) -> bool:
return await self.restart_cgminer()
async def stop_mining(self) -> bool:
return False
# data = await self.web.poweroff()
# try:
# return data["success"]
# except KeyError:
# return False
async def resume_mining(self) -> bool:
return False
# data = await self.web.restart_cgminer()
# print(data)
# try:
# return data["success"]
# except KeyError:
# return False
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
pass
# doesnt work for some reason
# self.config = config
# await self.web.update_pools(config.as_inno(user_suffix=user_suffix))
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(
self, web_get_all: dict = None, web_overview: dict = None
) -> Optional[str]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all and not web_overview:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_get_all:
try:
mac = web_get_all["mac"]
return mac.upper()
except KeyError:
pass
if web_overview:
try:
mac = web_overview["version"]["ethaddr"]
return mac.upper()
except KeyError:
pass
async def get_hashrate(
self, api_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
if web_get_all:
web_get_all = web_get_all["all"]
if not api_summary and not web_get_all:
try:
api_summary = await self.api.summary()
except APIError:
pass
if web_get_all:
try:
return round(float(web_get_all["total_hash"]["Hash Rate"] / 1000000), 5)
except KeyError:
pass
if api_summary:
try:
return round(
float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000000000), 5
)
except LookupError:
pass
async def get_hashboards(
self, api_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if web_get_all:
web_get_all = web_get_all["all"]
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
idx = board["Chain ID"]
chips = board["Num active chips"]
except KeyError:
pass
else:
hashboards[idx].chips = chips
hashboards[idx].missing = False
if web_get_all:
if web_get_all.get("chain"):
for board in web_get_all["chain"]:
idx = board.get("ASC")
if idx is not None:
temp = board.get("Temp min")
if temp:
hashboards[idx].temp = round(temp)
hashrate = board.get("Hash Rate H")
if hashrate:
hashboards[idx].hashrate = round(
hashrate / 1000000000000, 5
)
chip_temp = board.get("Temp max")
if chip_temp:
hashboards[idx].chip_temp = round(chip_temp)
return hashboards
async def get_wattage(
self, web_get_all: dict = None, api_stats: dict = None
) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
return web_get_all["power"]
except KeyError:
pass
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
wattage = board["power"]
except KeyError:
pass
else:
wattage = int(wattage)
return wattage
async def get_fans(self, web_get_all: dict = None) -> List[Fan]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
fans = [Fan() for _ in range(self.expected_fans)]
if web_get_all:
try:
spd = web_get_all["fansSpeed"]
except KeyError:
pass
else:
round((int(spd) * 6000) / 100)
for i in range(self.expected_fans):
fans[i].speed = spd
return fans
async def get_errors(
self, web_get_error_detail: dict = None
) -> List[MinerErrorData]: # noqa: named this way for automatic functionality
errors = []
if not web_get_error_detail:
try:
web_get_error_detail = await self.web.get_error_detail()
except APIError:
pass
if web_get_error_detail:
try:
# only 1 error?
# TODO: check if this should be a loop, can't remember.
err = web_get_error_detail["code"]
except KeyError:
pass
else:
err = int(err)
if not err == 0:
errors.append(InnosiliconError(error_code=err))
return errors
async def get_fw_ver(self, api_version: dict = None) -> Optional[str]:
if self.fw_ver:
return self.fw_ver
if not api_version:
try:
api_version = await self.api.version()
except APIError:
pass
if api_version:
try:
self.fw_ver = api_version["VERSION"][0]["CGMiner"].split("-")[-1:][0]
except LookupError:
pass
return self.fw_ver
async def get_wattage_limit(self, web_get_all: dict = None) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
level = web_get_all["running_mode"]["level"]
except KeyError:
pass
else:
# this is very possibly not correct.
level = int(level)
limit = 1250 + (250 * level)
return limit

View File

@@ -14,4 +14,4 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from .A10X import CGMinerA10X from .A10X import InnosiliconA10X

View File

@@ -13,280 +13,10 @@
# See the License for the specific language governing permissions and - # See the License for the specific language governing permissions and -
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from typing import List, Optional
from pyasic.config import MinerConfig from pyasic.miners.backends.innosilicon import Innosilicon
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import InnosiliconError, MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.backends import CGMiner
from pyasic.miners.types import T3HPlus from pyasic.miners.types import T3HPlus
from pyasic.web.inno import InnosiliconWebAPI
class CGMinerT3HPlus(CGMiner, T3HPlus): class InnosiliconT3HPlus(Innosilicon, T3HPlus):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None: pass
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.web = InnosiliconWebAPI(ip)
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self, api_pools: dict = None) -> MinerConfig:
# get pool data
try:
pools = await self.api.pools()
except APIError:
return self.config
self.config = MinerConfig.from_api(pools)
return self.config
async def reboot(self) -> bool:
try:
data = await self.web.reboot()
except APIError:
pass
else:
return data["success"]
async def restart_cgminer(self) -> bool:
try:
data = await self.web.restart_cgminer()
except APIError:
pass
else:
return data["success"]
async def restart_backend(self) -> bool:
return await self.restart_cgminer()
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
await self.web.update_pools(config.as_inno(user_suffix=user_suffix))
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(
self, web_get_all: dict = None, web_overview: dict = None
) -> Optional[str]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all and not web_overview:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_get_all:
try:
mac = web_get_all["mac"]
return mac.upper()
except KeyError:
pass
if web_overview:
try:
mac = web_overview["version"]["ethaddr"]
return mac.upper()
except KeyError:
pass
async def get_hashrate(
self, api_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
if web_get_all:
web_get_all = web_get_all["all"]
if not api_summary and not web_get_all:
try:
api_summary = await self.api.summary()
except APIError:
pass
if web_get_all:
try:
return round(
float(web_get_all["total_hash"]["Hash Rate H"] / 1000000000000), 2
)
except KeyError:
pass
if api_summary:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except LookupError:
pass
async def get_hashboards(
self, api_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
if web_get_all:
web_get_all = web_get_all["all"]
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
idx = board["Chain ID"]
chips = board["Num active chips"]
except KeyError:
pass
else:
hashboards[idx].chips = chips
hashboards[idx].missing = False
if web_get_all:
if web_get_all.get("chain"):
for board in web_get_all["chain"]:
idx = board.get("ASC")
if idx is not None:
temp = board.get("Temp min")
if temp:
hashboards[idx].temp = round(temp)
hashrate = board.get("Hash Rate H")
if hashrate:
hashboards[idx].hashrate = round(
hashrate / 1000000000000, 2
)
chip_temp = board.get("Temp max")
if chip_temp:
hashboards[idx].chip_temp = round(chip_temp)
return hashboards
async def get_wattage(
self, web_get_all: dict = None, api_stats: dict = None
) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
return web_get_all["power"]
except KeyError:
pass
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
wattage = board["power"]
except KeyError:
pass
else:
wattage = int(wattage)
return wattage
async def get_fans(self, web_get_all: dict = None) -> List[Fan]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
fans = [Fan() for _ in range(self.fan_count)]
if web_get_all:
try:
spd = web_get_all["fansSpeed"]
except KeyError:
pass
else:
round((int(spd) * 6000) / 100)
for i in range(self.fan_count):
fans[i].speed = spd
return fans
async def get_errors(
self, web_get_error_detail: dict = None
) -> List[MinerErrorData]: # noqa: named this way for automatic functionality
errors = []
if not web_get_error_detail:
try:
web_get_error_detail = await self.web.get_error_detail()
except APIError:
pass
if web_get_error_detail:
try:
# only 1 error?
# TODO: check if this should be a loop, can't remember.
err = web_get_error_detail["code"]
except KeyError:
pass
else:
err = int(err)
if not err == 0:
errors.append(InnosiliconError(error_code=err))
return errors
async def get_wattage_limit(self, web_get_all: dict = None) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
level = web_get_all["running_mode"]["level"]
except KeyError:
pass
else:
# this is very possibly not correct.
level = int(level)
limit = 1250 + (250 * level)
return limit

View File

@@ -14,4 +14,4 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from .T3H import CGMinerT3HPlus from .T3H import InnosiliconT3HPlus

View File

@@ -326,8 +326,8 @@ MINER_CLASSES = {
}, },
MinerTypes.INNOSILICON: { MinerTypes.INNOSILICON: {
None: CGMiner, None: CGMiner,
"T3H+": CGMinerT3HPlus, "T3H+": InnosiliconT3HPlus,
"A10X": CGMinerA10X, "A10X": InnosiliconA10X,
}, },
MinerTypes.GOLDSHELL: { MinerTypes.GOLDSHELL: {
None: BFGMiner, None: BFGMiner,

View File

@@ -72,6 +72,31 @@ class MinersTest(unittest.TestCase):
) )
self.assertEqual(miner_keys, keys) self.assertEqual(miner_keys, keys)
def test_data_locations_match_signatures_command(self):
warnings.filterwarnings("ignore")
for miner_model in MINER_CLASSES.keys():
for miner_api in MINER_CLASSES[miner_model].keys():
miner = MINER_CLASSES[miner_model][miner_api]("127.0.0.1")
for data_point in asdict(miner.data_locations).values():
with self.subTest(
msg=f"Test {data_point['cmd']} signature matches with model={miner_model}, api={miner_api}",
miner_model=miner_model,
miner_api=miner_api,
):
func = getattr(miner, data_point["cmd"])
signature = inspect.signature(func)
parameters = signature.parameters
param_names = list(parameters.keys())
for arg in ["kwargs", "args"]:
try:
param_names.remove(arg)
except ValueError:
pass
self.assertEqual(
set(param_names),
set([k["name"] for k in data_point["kwargs"]]),
)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()