Update data locations to be typed with dataclasses and enums. (#82)

* feature: swap AntminerModern to new data location style.

* bug: fix a bunch of missed instances of `nominal_` naming.

* feature: add support for S19 Pro Hydro.

* version: bump version number.

* dependencies: bump httpx version

* version: bump version number.

* feature: implement data locations for all remaining miners.

* refactor: remove some unused docstrings.

* feature: swap AntminerModern to new data location style.

* feature: implement data locations for all remaining miners.

* refactor: remove some unused docstrings.

* bug: fix misnamed data locations, and update base miner get_data to use new data locations.

* bug: fix include/exclude implementation on get_data.

* bug: swap ePIC to BaseMiner subclass.

* feature: add DataOptions to __all__

* tests: update data tests with new data locations method.

* bug: remove bad command from bosminer commands.

* dependencies: update dependencies.

* bug: fix some typing issues with python 3.8, and remove useless semaphore and scan threads.

* bug: fix KeyError when pools rpc command returns broken data.
This commit is contained in:
UpstreamData
2024-01-04 13:03:45 -07:00
committed by GitHub
parent 936474ed3b
commit 6e7442f90d
24 changed files with 861 additions and 984 deletions

View File

@@ -26,30 +26,52 @@ from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import BraiinsOSError, MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.base import BaseMiner
from pyasic.miners.base import (
BaseMiner,
DataFunction,
DataLocations,
DataOptions,
RPCAPICommand,
WebAPICommand,
)
from pyasic.web.bosminer import BOSMinerWebAPI
LUXMINER_DATA_LOC = {
"mac": {"cmd": "get_mac", "kwargs": {"api_config": {"api": "config"}}},
"model": {"cmd": "get_model", "kwargs": {}},
"api_ver": {"cmd": "get_api_ver", "kwargs": {}},
"fw_ver": {"cmd": "get_fw_ver", "kwargs": {}},
"hostname": {"cmd": "get_hostname", "kwargs": {}},
"hashrate": {"cmd": "get_hashrate", "kwargs": {}},
"expected_hashrate": {"cmd": "get_expected_hashrate", "kwargs": {}},
"hashboards": {"cmd": "get_hashboards", "kwargs": {}},
"wattage": {"cmd": "get_wattage", "kwargs": {}},
"wattage_limit": {"cmd": "get_wattage_limit", "kwargs": {}},
"fans": {"cmd": "get_fans", "kwargs": {}},
"fan_psu": {"cmd": "get_fan_psu", "kwargs": {}},
"env_temp": {"cmd": "get_env_temp", "kwargs": {}},
"errors": {"cmd": "get_errors", "kwargs": {}},
"fault_light": {"cmd": "get_fault_light", "kwargs": {}},
"pools": {"cmd": "get_pools", "kwargs": {}},
"is_mining": {"cmd": "is_mining", "kwargs": {}},
"uptime": {"cmd": "get_uptime", "kwargs": {"api_stats": {"api": "stats"}}},
"config": {"cmd": "get_config", "kwargs": {}},
}
LUXMINER_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"get_mac", [RPCAPICommand("api_config", "config")]
),
str(DataOptions.MODEL): DataFunction("get_model"),
str(DataOptions.API_VERSION): DataFunction("get_api_ver"),
str(DataOptions.FW_VERSION): DataFunction("get_fw_ver"),
str(DataOptions.HOSTNAME): DataFunction("get_hostname"),
str(DataOptions.HASHRATE): DataFunction(
"get_hashrate", [RPCAPICommand("api_summary", "summary")]
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"get_expected_hashrate", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.HASHBOARDS): DataFunction(
"get_hashboards", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction("get_env_temp"),
str(DataOptions.WATTAGE): DataFunction(
"get_wattage", [RPCAPICommand("api_power", "power")]
),
str(DataOptions.WATTAGE_LIMIT): DataFunction("get_wattage_limit"),
str(DataOptions.FANS): DataFunction(
"get_fans", [RPCAPICommand("api_fans", "fans")]
),
str(DataOptions.FAN_PSU): DataFunction("get_fan_psu"),
str(DataOptions.ERRORS): DataFunction("get_errors"),
str(DataOptions.FAULT_LIGHT): DataFunction("get_fault_light"),
str(DataOptions.IS_MINING): DataFunction("is_mining"),
str(DataOptions.UPTIME): DataFunction(
"get_uptime", [RPCAPICommand("api_stats", "stats")]
),
str(DataOptions.CONFIG): DataFunction("get_config"),
}
)
class LUXMiner(BaseMiner):
@@ -85,7 +107,6 @@ class LUXMiner(BaseMiner):
return
async def fault_light_on(self) -> bool:
"""Sends command to turn on fault light on the miner."""
try:
session_id = await self._get_session()
if session_id:
@@ -96,7 +117,6 @@ class LUXMiner(BaseMiner):
return False
async def fault_light_off(self) -> bool:
"""Sends command to turn off fault light on the miner."""
try:
session_id = await self._get_session()
if session_id:
@@ -107,11 +127,9 @@ class LUXMiner(BaseMiner):
return False
async def restart_backend(self) -> bool:
"""Restart luxminer hashing process. Wraps [`restart_luxminer`][pyasic.miners.backends.luxminer.LUXMiner.restart_luxminer] to standardize."""
return await self.restart_luxminer()
async def restart_luxminer(self) -> bool:
"""Restart luxminer hashing process."""
try:
session_id = await self._get_session()
if session_id:
@@ -141,7 +159,6 @@ class LUXMiner(BaseMiner):
pass
async def reboot(self) -> bool:
"""Reboots power to the physical miner."""
try:
session_id = await self._get_session()
if session_id:
@@ -303,56 +320,6 @@ class LUXMiner(BaseMiner):
async def get_fan_psu(self) -> Optional[int]:
return None
async def get_pools(self, api_pools: dict = None) -> List[dict]:
if not api_pools:
try:
api_pools = await self.api.pools()
except APIError:
pass
if api_pools:
seen = []
groups = [{"quota": "0"}]
if api_pools.get("POOLS"):
for i, pool in enumerate(api_pools["POOLS"]):
if len(seen) == 0:
seen.append(pool["User"])
if not pool["User"] in seen:
# need to use get_config, as this will never read perfectly as there are some bad edge cases
groups = []
cfg = await self.get_config()
if cfg:
for group in cfg.pool_groups:
pools = {"quota": group.quota}
for _i, _pool in enumerate(group.pools):
pools[f"pool_{_i + 1}_url"] = _pool.url.replace(
"stratum+tcp://", ""
).replace("stratum2+tcp://", "")
pools[f"pool_{_i + 1}_user"] = _pool.username
groups.append(pools)
return groups
else:
groups[0][f"pool_{i + 1}_url"] = (
pool["URL"]
.replace("stratum+tcp://", "")
.replace("stratum2+tcp://", "")
)
groups[0][f"pool_{i + 1}_user"] = pool["User"]
else:
groups = []
cfg = await self.get_config()
if cfg:
for group in cfg.pool_groups:
pools = {"quota": group.quota}
for _i, _pool in enumerate(group.pools):
pools[f"pool_{_i + 1}_url"] = _pool.url.replace(
"stratum+tcp://", ""
).replace("stratum2+tcp://", "")
pools[f"pool_{_i + 1}_user"] = _pool.username
groups.append(pools)
return groups
return groups
async def get_errors(self) -> List[MinerErrorData]:
pass