bug: add test to cross check function arguments, and fix some method implementations and naming.

This commit is contained in:
UpstreamData
2024-01-15 10:16:47 -07:00
parent ef9a026ee8
commit 34006941ad
14 changed files with 541 additions and 629 deletions

View File

@@ -2,9 +2,8 @@
## A10X Models
## A10X
::: pyasic.miners.innosilicon.cgminer.A10X.A10X.CGMinerA10X
::: pyasic.miners.innosilicon.cgminer.A10X.A10X.InnosiliconA10X
handler: python
options:
show_root_heading: false
heading_level: 4

View File

@@ -2,9 +2,8 @@
## T3X Models
## T3H+
::: pyasic.miners.innosilicon.cgminer.T3X.T3H.CGMinerT3HPlus
::: pyasic.miners.innosilicon.cgminer.T3X.T3H.InnosiliconT3HPlus
handler: python
options:
show_root_heading: false
heading_level: 4

View File

@@ -25,10 +25,6 @@ from pyasic.miners.types import T9
class HiveonT9(Hiveon, T9):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.pwd = "admin"
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
@@ -46,34 +42,44 @@ class HiveonT9(Hiveon, T9):
pass
async def get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
hashboards = [
HashBoard(slot=board, expected_chips=self.expected_chips)
for board in range(self.expected_hashboards)
]
if api_stats is None:
try:
api_stats = self.api.stats()
except APIError:
return []
board_map = {
0: [2, 9, 10],
1: [3, 11, 12],
2: [4, 13, 14],
}
hashboards = []
for board in board_map:
hashboard = HashBoard(slot=board, expected_chips=self.expected_chips)
hashrate = 0
chips = 0
for chipset in board_map[board]:
if hashboard.chip_temp == None:
if hashboards[board].chip_temp is None:
try:
hashboard.board_temp = api_stats["STATS"][1][f"temp{chipset}"]
hashboard.chip_temp = api_stats["STATS"][1][f"temp2_{chipset}"]
hashboards[board].temp = api_stats["STATS"][1][f"temp{chipset}"]
hashboards[board].chip_temp = api_stats["STATS"][1][
f"temp2_{chipset}"
]
except (KeyError, IndexError):
pass
else:
hashboard.missing = False
hashboards[board].missing = False
try:
hashrate += api_stats["STATS"][1][f"chain_rate{chipset}"]
chips += api_stats["STATS"][1][f"chain_acn{chipset}"]
except (KeyError, IndexError):
pass
hashboard.hashrate = round(hashrate / 1000, 2)
hashboard.chips = chips
hashboards.append(hashboard)
hashboards[board].hashrate = round(hashrate / 1000, 2)
hashboards[board].chips = chips
return hashboards

View File

@@ -88,7 +88,11 @@ BTMINER_DATA_LOC = DataLocations(
],
),
str(DataOptions.ERRORS): DataFunction(
"get_errors", [RPCAPICommand("api_get_error_code", "get_error_code")]
"get_errors",
[
RPCAPICommand("api_get_error_code", "get_error_code"),
RPCAPICommand("api_summary", "summary"),
],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"get_fault_light",
@@ -546,7 +550,21 @@ class BTMiner(BaseMiner):
self, api_summary: dict = None, api_get_error_code: dict = None
) -> List[MinerErrorData]:
errors = []
if not api_summary and not api_get_error_code:
if not api_get_error_code and not api_summary:
try:
api_get_error_code = await self.api.get_error_code()
except APIError:
pass
if api_get_error_code:
for err in api_get_error_code["Msg"]["error_code"]:
if isinstance(err, dict):
for code in err:
errors.append(WhatsminerError(error_code=int(code)))
else:
errors.append(WhatsminerError(error_code=int(err)))
if not api_summary:
try:
api_summary = await self.api.summary()
except APIError:
@@ -561,20 +579,6 @@ class BTMiner(BaseMiner):
except (KeyError, IndexError, ValueError, TypeError):
pass
if not api_get_error_code:
try:
api_get_error_code = await self.api.get_error_code()
except APIError:
pass
if api_get_error_code:
for err in api_get_error_code["Msg"]["error_code"]:
if isinstance(err, dict):
for code in err:
errors.append(WhatsminerError(error_code=int(code)))
else:
errors.append(WhatsminerError(error_code=int(err)))
return errors
async def get_expected_hashrate(self, api_summary: dict = None):

View File

@@ -37,9 +37,7 @@ AVALON_DATA_LOC = DataLocations(
str(DataOptions.FW_VERSION): DataFunction(
"get_fw_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction(
"get_hostname", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"get_hashrate", [RPCAPICommand("api_devs", "devs")]
),
@@ -194,7 +192,7 @@ class CGMinerAvalon(CGMiner):
except (KeyError, ValueError):
pass
async def get_hostname(self, mac: str = None) -> Optional[str]:
async def get_hostname(self) -> Optional[str]:
return None
# if not mac:
# mac = await self.get_mac()
@@ -367,3 +365,6 @@ class CGMinerAvalon(CGMiner):
async def is_mining(self, *args, **kwargs) -> Optional[bool]:
return None
async def get_uptime(self) -> Optional[int]:
return None

View File

@@ -14,18 +14,73 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import Optional
from typing import List, Optional
from pyasic.data import HashBoard
from pyasic.miners.backends import BMMiner
from pyasic.miners.base import DataFunction, DataLocations, DataOptions, RPCAPICommand
HIVEON_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction("get_mac"),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"get_api_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.FW_VERSION): DataFunction(
"get_fw_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"get_hashrate", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"get_expected_hashrate", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.HASHBOARDS): DataFunction(
"get_hashboards", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"get_env_temp", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.WATTAGE): DataFunction(
"get_wattage", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.WATTAGE_LIMIT): DataFunction("get_wattage_limit"),
str(DataOptions.FANS): DataFunction(
"get_fans", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction("get_errors"),
str(DataOptions.FAULT_LIGHT): DataFunction("get_fault_light"),
str(DataOptions.IS_MINING): DataFunction("is_mining"),
str(DataOptions.UPTIME): DataFunction(
"get_uptime", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
)
class Hiveon(BMMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver)
self.pwd = "admin"
# static data
self.api_type = "Hiveon"
# data gathering locations
self.data_locations = HIVEON_DATA_LOC
async def get_model(self) -> Optional[str]:
if self.model is not None:
return self.model + " (Hiveon)"
return "? (Hiveon)"
async def get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
pass
async def get_wattage(self, api_stats: dict = None) -> Optional[int]:
pass
async def get_env_temp(self, api_stats: dict = None) -> Optional[float]:
pass

View File

@@ -0,0 +1,404 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import List, Optional
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import MinerErrorData
from pyasic.data.error_codes.innosilicon import InnosiliconError
from pyasic.errors import APIError
from pyasic.miners.backends import CGMiner
from pyasic.miners.base import (
DataFunction,
DataLocations,
DataOptions,
RPCAPICommand,
WebAPICommand,
)
from pyasic.web.innosilicon import InnosiliconWebAPI
INNOSILICON_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"get_mac",
[
WebAPICommand("web_get_all", "getAll"),
WebAPICommand("web_overview", "overview"),
],
),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction(
"get_api_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.FW_VERSION): DataFunction(
"get_fw_ver", [RPCAPICommand("api_version", "version")]
),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"get_hashrate",
[
RPCAPICommand("api_summary", "summary"),
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"get_expected_hashrate",
),
str(DataOptions.HASHBOARDS): DataFunction(
"get_hashboards",
[
RPCAPICommand("api_stats", "stats"),
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction("get_env_temp"),
str(DataOptions.WATTAGE): DataFunction(
"get_wattage",
[
WebAPICommand("web_get_all", "getAll"),
RPCAPICommand("api_stats", "stats"),
],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"get_wattage_limit",
[
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.FANS): DataFunction(
"get_fans",
[
WebAPICommand("web_get_all", "getAll"),
],
),
str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction(
"get_errors",
[
WebAPICommand("web_get_error_detail", "getErrorDetail"),
],
),
str(DataOptions.FAULT_LIGHT): DataFunction("get_fault_light"),
str(DataOptions.IS_MINING): DataFunction("is_mining"),
str(DataOptions.UPTIME): DataFunction(
"get_uptime", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
)
class Innosilicon(CGMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
# interfaces
self.web = InnosiliconWebAPI(ip)
# static data
# data gathering locations
self.data_locations = INNOSILICON_DATA_LOC
# autotuning/shutdown support
self.supports_shutdown = True
# data storage
self.api_ver = api_ver
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self) -> MinerConfig:
# get pool data
try:
pools = await self.web.pools()
except APIError:
return self.config
self.config = MinerConfig.from_inno(pools)
return self.config
async def reboot(self) -> bool:
try:
data = await self.web.reboot()
except APIError:
pass
else:
return data["success"]
async def restart_cgminer(self) -> bool:
try:
data = await self.web.restart_cgminer()
except APIError:
pass
else:
return data["success"]
async def restart_backend(self) -> bool:
return await self.restart_cgminer()
async def stop_mining(self) -> bool:
return False
# data = await self.web.poweroff()
# try:
# return data["success"]
# except KeyError:
# return False
async def resume_mining(self) -> bool:
return False
# data = await self.web.restart_cgminer()
# print(data)
# try:
# return data["success"]
# except KeyError:
# return False
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
await self.web.update_pools(config.as_inno(user_suffix=user_suffix))
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(
self, web_get_all: dict = None, web_overview: dict = None
) -> Optional[str]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all and not web_overview:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_get_all:
try:
mac = web_get_all["mac"]
return mac.upper()
except KeyError:
pass
if web_overview:
try:
mac = web_overview["version"]["ethaddr"]
return mac.upper()
except KeyError:
pass
async def get_hashrate(
self, api_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
if web_get_all:
web_get_all = web_get_all["all"]
if not api_summary and not web_get_all:
try:
api_summary = await self.api.summary()
except APIError:
pass
if web_get_all:
try:
if "Hash Rate H" in web_get_all["total_hash"].keys():
return round(
float(web_get_all["total_hash"]["Hash Rate H"] / 1000000000000),
2,
)
elif "Hash Rate" in web_get_all["total_hash"].keys():
return round(
float(web_get_all["total_hash"]["Hash Rate"] / 1000000), 5
)
except KeyError:
pass
if api_summary:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except (KeyError, IndexError):
pass
async def get_hashboards(
self, api_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
if web_get_all:
web_get_all = web_get_all["all"]
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
idx = board["Chain ID"]
chips = board["Num active chips"]
except KeyError:
pass
else:
hashboards[idx].chips = chips
hashboards[idx].missing = False
if web_get_all:
if web_get_all.get("chain"):
for board in web_get_all["chain"]:
idx = board.get("ASC")
if idx is not None:
temp = board.get("Temp min")
if temp:
hashboards[idx].temp = round(temp)
hashrate = board.get("Hash Rate H")
if hashrate:
hashboards[idx].hashrate = round(
hashrate / 1000000000000, 2
)
chip_temp = board.get("Temp max")
if chip_temp:
hashboards[idx].chip_temp = round(chip_temp)
return hashboards
async def get_wattage(
self, web_get_all: dict = None, api_stats: dict = None
) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
return web_get_all["power"]
except KeyError:
pass
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
wattage = board["power"]
except KeyError:
pass
else:
wattage = int(wattage)
return wattage
async def get_fans(self, web_get_all: dict = None) -> List[Fan]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
fans = [Fan() for _ in range(self.fan_count)]
if web_get_all:
try:
spd = web_get_all["fansSpeed"]
except KeyError:
pass
else:
round((int(spd) * 6000) / 100)
for i in range(self.fan_count):
fans[i].speed = spd
return fans
async def get_errors(
self, web_get_error_detail: dict = None
) -> List[MinerErrorData]:
errors = []
if not web_get_error_detail:
try:
web_get_error_detail = await self.web.get_error_detail()
except APIError:
pass
if web_get_error_detail:
try:
# only 1 error?
# TODO: check if this should be a loop, can't remember.
err = web_get_error_detail["code"]
except KeyError:
pass
else:
err = int(err)
if not err == 0:
errors.append(InnosiliconError(error_code=err))
return errors
async def get_wattage_limit(self, web_get_all: dict = None) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
level = web_get_all["running_mode"]["level"]
except KeyError:
pass
else:
# this is very possibly not correct.
level = int(level)
limit = 1250 + (250 * level)
return limit
async def get_expected_hashrate(self) -> Optional[float]:
pass

View File

@@ -13,320 +13,10 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import logging
from typing import List, Optional
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import InnosiliconError, MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.backends import CGMiner
from pyasic.miners.backends.innosilicon import Innosilicon
from pyasic.miners.types import A10X
from pyasic.web.inno import InnosiliconWebAPI
class CGMinerA10X(CGMiner, A10X):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.web = InnosiliconWebAPI(ip)
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self, web_pools: dict = None) -> MinerConfig:
if not web_pools:
try:
web_pools = await self.web.pools()
except APIError as e:
logging.warning(e)
if web_pools:
if "pools" in web_pools.keys():
cfg = MinerConfig().from_raw(web_pools)
self.config = cfg
return self.config
async def reboot(self) -> bool:
try:
data = await self.web.reboot()
except APIError:
pass
else:
return data["success"]
async def restart_cgminer(self) -> bool:
try:
data = await self.web.restart_cgminer()
except APIError:
pass
else:
return data["success"]
async def restart_backend(self) -> bool:
return await self.restart_cgminer()
async def stop_mining(self) -> bool:
return False
# data = await self.web.poweroff()
# try:
# return data["success"]
# except KeyError:
# return False
async def resume_mining(self) -> bool:
return False
# data = await self.web.restart_cgminer()
# print(data)
# try:
# return data["success"]
# except KeyError:
# return False
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
pass
# doesnt work for some reason
# self.config = config
# await self.web.update_pools(config.as_inno(user_suffix=user_suffix))
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(
self, web_get_all: dict = None, web_overview: dict = None
) -> Optional[str]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all and not web_overview:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_get_all:
try:
mac = web_get_all["mac"]
return mac.upper()
except KeyError:
pass
if web_overview:
try:
mac = web_overview["version"]["ethaddr"]
return mac.upper()
except KeyError:
pass
async def get_hashrate(
self, api_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
if web_get_all:
web_get_all = web_get_all["all"]
if not api_summary and not web_get_all:
try:
api_summary = await self.api.summary()
except APIError:
pass
if web_get_all:
try:
return round(float(web_get_all["total_hash"]["Hash Rate"] / 1000000), 5)
except KeyError:
pass
if api_summary:
try:
return round(
float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000000000), 5
)
except (KeyError, IndexError):
pass
async def get_hashboards(
self, api_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if web_get_all:
web_get_all = web_get_all["all"]
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
idx = board["Chain ID"]
chips = board["Num active chips"]
except KeyError:
pass
else:
hashboards[idx].chips = chips
hashboards[idx].missing = False
if web_get_all:
if web_get_all.get("chain"):
for board in web_get_all["chain"]:
idx = board.get("ASC")
if idx is not None:
temp = board.get("Temp min")
if temp:
hashboards[idx].temp = round(temp)
hashrate = board.get("Hash Rate H")
if hashrate:
hashboards[idx].hashrate = round(
hashrate / 1000000000000, 5
)
chip_temp = board.get("Temp max")
if chip_temp:
hashboards[idx].chip_temp = round(chip_temp)
return hashboards
async def get_wattage(
self, web_get_all: dict = None, api_stats: dict = None
) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
return web_get_all["power"]
except KeyError:
pass
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
wattage = board["power"]
except KeyError:
pass
else:
wattage = int(wattage)
return wattage
async def get_fans(self, web_get_all: dict = None) -> List[Fan]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
fans = [Fan() for _ in range(self.fan_count)]
if web_get_all:
try:
spd = web_get_all["fansSpeed"]
except KeyError:
pass
else:
round((int(spd) * 6000) / 100)
for i in range(self.fan_count):
fans[i].speed = spd
return fans
async def get_errors(
self, web_get_error_detail: dict = None
) -> List[MinerErrorData]: # noqa: named this way for automatic functionality
errors = []
if not web_get_error_detail:
try:
web_get_error_detail = await self.web.get_error_detail()
except APIError:
pass
if web_get_error_detail:
try:
# only 1 error?
# TODO: check if this should be a loop, can't remember.
err = web_get_error_detail["code"]
except KeyError:
pass
else:
err = int(err)
if not err == 0:
errors.append(InnosiliconError(error_code=err))
return errors
async def get_fw_ver(self, api_version: dict = None) -> Optional[str]:
if self.fw_ver:
return self.fw_ver
if not api_version:
try:
api_version = await self.api.version()
except APIError:
pass
if api_version:
try:
self.fw_ver = api_version["VERSION"][0]["CGMiner"].split("-")[-1:][0]
except (KeyError, IndexError):
pass
return self.fw_ver
async def get_wattage_limit(self, web_get_all: dict = None) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
level = web_get_all["running_mode"]["level"]
except KeyError:
pass
else:
# this is very possibly not correct.
level = int(level)
limit = 1250 + (250 * level)
return limit
class InnosiliconA10X(Innosilicon, A10X):
pass

View File

@@ -14,4 +14,4 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from .A10X import CGMinerA10X
from .A10X import InnosiliconA10X

View File

@@ -13,281 +13,10 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import logging
from typing import List, Optional
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import InnosiliconError, MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.backends import CGMiner
from pyasic.miners.backends.innosilicon import Innosilicon
from pyasic.miners.types import T3HPlus
from pyasic.web.inno import InnosiliconWebAPI
class CGMinerT3HPlus(CGMiner, T3HPlus):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip, api_ver=api_ver)
self.ip = ip
self.web = InnosiliconWebAPI(ip)
async def fault_light_on(self) -> bool:
return False
async def fault_light_off(self) -> bool:
return False
async def get_config(self, api_pools: dict = None) -> MinerConfig:
# get pool data
try:
pools = await self.api.pools()
except APIError:
return self.config
self.config = MinerConfig.from_api(pools)
return self.config
async def reboot(self) -> bool:
try:
data = await self.web.reboot()
except APIError:
pass
else:
return data["success"]
async def restart_cgminer(self) -> bool:
try:
data = await self.web.restart_cgminer()
except APIError:
pass
else:
return data["success"]
async def restart_backend(self) -> bool:
return await self.restart_cgminer()
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
await self.web.update_pools(config.as_inno(user_suffix=user_suffix))
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(
self, web_get_all: dict = None, web_overview: dict = None
) -> Optional[str]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all and not web_overview:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_get_all:
try:
mac = web_get_all["mac"]
return mac.upper()
except KeyError:
pass
if web_overview:
try:
mac = web_overview["version"]["ethaddr"]
return mac.upper()
except KeyError:
pass
async def get_hashrate(
self, api_summary: dict = None, web_get_all: dict = None
) -> Optional[float]:
if web_get_all:
web_get_all = web_get_all["all"]
if not api_summary and not web_get_all:
try:
api_summary = await self.api.summary()
except APIError:
pass
if web_get_all:
try:
return round(
float(web_get_all["total_hash"]["Hash Rate H"] / 1000000000000), 2
)
except KeyError:
pass
if api_summary:
try:
return round(float(api_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
except (KeyError, IndexError):
pass
async def get_hashboards(
self, api_stats: dict = None, web_get_all: dict = None
) -> List[HashBoard]:
if web_get_all:
web_get_all = web_get_all["all"]
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
idx = board["Chain ID"]
chips = board["Num active chips"]
except KeyError:
pass
else:
hashboards[idx].chips = chips
hashboards[idx].missing = False
if web_get_all:
if web_get_all.get("chain"):
for board in web_get_all["chain"]:
idx = board.get("ASC")
if idx is not None:
temp = board.get("Temp min")
if temp:
hashboards[idx].temp = round(temp)
hashrate = board.get("Hash Rate H")
if hashrate:
hashboards[idx].hashrate = round(
hashrate / 1000000000000, 2
)
chip_temp = board.get("Temp max")
if chip_temp:
hashboards[idx].chip_temp = round(chip_temp)
return hashboards
async def get_wattage(
self, web_get_all: dict = None, api_stats: dict = None
) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
return web_get_all["power"]
except KeyError:
pass
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
if api_stats.get("STATS"):
for board in api_stats["STATS"]:
try:
wattage = board["power"]
except KeyError:
pass
else:
wattage = int(wattage)
return wattage
async def get_fans(self, web_get_all: dict = None) -> List[Fan]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
fans = [Fan() for _ in range(self.fan_count)]
if web_get_all:
try:
spd = web_get_all["fansSpeed"]
except KeyError:
pass
else:
round((int(spd) * 6000) / 100)
for i in range(self.fan_count):
fans[i].speed = spd
return fans
async def get_errors(
self, web_get_error_detail: dict = None
) -> List[MinerErrorData]: # noqa: named this way for automatic functionality
errors = []
if not web_get_error_detail:
try:
web_get_error_detail = await self.web.get_error_detail()
except APIError:
pass
if web_get_error_detail:
try:
# only 1 error?
# TODO: check if this should be a loop, can't remember.
err = web_get_error_detail["code"]
except KeyError:
pass
else:
err = int(err)
if not err == 0:
errors.append(InnosiliconError(error_code=err))
return errors
async def get_wattage_limit(self, web_get_all: dict = None) -> Optional[int]:
if web_get_all:
web_get_all = web_get_all["all"]
if not web_get_all:
try:
web_get_all = await self.web.get_all()
except APIError:
pass
else:
web_get_all = web_get_all["all"]
if web_get_all:
try:
level = web_get_all["running_mode"]["level"]
except KeyError:
pass
else:
# this is very possibly not correct.
level = int(level)
limit = 1250 + (250 * level)
return limit
class InnosiliconT3HPlus(Innosilicon, T3HPlus):
pass

View File

@@ -14,4 +14,4 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from .T3H import CGMinerT3HPlus
from .T3H import InnosiliconT3HPlus

View File

@@ -326,8 +326,8 @@ MINER_CLASSES = {
},
MinerTypes.INNOSILICON: {
None: CGMiner,
"T3H+": CGMinerT3HPlus,
"A10X": CGMinerA10X,
"T3H+": InnosiliconT3HPlus,
"A10X": InnosiliconA10X,
},
MinerTypes.GOLDSHELL: {
None: BFGMiner,

View File

@@ -77,6 +77,31 @@ class MinersTest(unittest.TestCase):
)
self.assertEqual(miner_keys, keys)
def test_data_locations_match_signatures_command(self):
warnings.filterwarnings("ignore")
for miner_model in MINER_CLASSES.keys():
for miner_api in MINER_CLASSES[miner_model].keys():
miner = MINER_CLASSES[miner_model][miner_api]("127.0.0.1")
for data_point in asdict(miner.data_locations).values():
with self.subTest(
msg=f"Test {data_point['cmd']} signature matches with model={miner_model}, api={miner_api}",
miner_model=miner_model,
miner_api=miner_api,
):
func = getattr(miner, data_point["cmd"])
signature = inspect.signature(func)
parameters = signature.parameters
param_names = list(parameters.keys())
for arg in ["kwargs", "args"]:
try:
param_names.remove(arg)
except ValueError:
pass
self.assertEqual(
set(param_names),
set([k["name"] for k in data_point["kwargs"]]),
)
if __name__ == "__main__":
unittest.main()