Compare commits

..

3 Commits

Author SHA1 Message Date
Upstream Data
54f0292712 bug: try to fix scan timeout with asyncio.wait_for. 2024-05-13 08:28:12 -06:00
Upstream Data
46c56134f7 docs: update docs generation to fix marathon miners. 2024-04-29 09:14:08 -06:00
Upstream Data
1c1f7f1098 bug: move client into web ping to try to fix scanning failing. 2024-04-26 13:07:08 -06:00
16 changed files with 228 additions and 681 deletions

View File

@@ -27,7 +27,6 @@ from pyasic.misc import merge_dicts
class MinerConfig:
"""Represents the configuration for a miner including pool configuration,
fan mode, temperature settings, mining mode, and power scaling."""
pools: PoolConfig = field(default_factory=PoolConfig.default)
fan_mode: FanModeConfig = field(default_factory=FanModeConfig.default)
temperature: TemperatureConfig = field(default_factory=TemperatureConfig.default)
@@ -111,7 +110,7 @@ class MinerConfig:
}
def as_boser(self, user_suffix: str = None) -> dict:
""" "Generates the configuration in the format suitable for BOSer."""
""""Generates the configuration in the format suitable for BOSer."""
return {
**self.fan_mode.as_boser(),
**self.temperature.as_boser(),
@@ -139,15 +138,6 @@ class MinerConfig:
**self.power_scaling.as_auradine(),
}
def as_mara(self, user_suffix: str = None) -> dict:
return {
**self.fan_mode.as_mara(),
**self.temperature.as_mara(),
**self.mining_mode.as_mara(),
**self.pools.as_mara(user_suffix=user_suffix),
**self.power_scaling.as_mara(),
}
@classmethod
def from_dict(cls, dict_conf: dict) -> "MinerConfig":
"""Constructs a MinerConfig object from a dictionary."""
@@ -238,11 +228,3 @@ class MinerConfig:
fan_mode=FanModeConfig.from_auradine(web_conf["fan"]),
mining_mode=MiningModeConfig.from_auradine(web_conf["mode"]),
)
@classmethod
def from_mara(cls, web_miner_config: dict) -> "MinerConfig":
return cls(
pools=PoolConfig.from_mara(web_miner_config),
fan_mode=FanModeConfig.from_mara(web_miner_config),
mining_mode=MiningModeConfig.from_mara(web_miner_config),
)

View File

@@ -57,9 +57,6 @@ class MinerConfigOption(Enum):
def as_auradine(self) -> dict:
return self.value.as_auradine()
def as_mara(self) -> dict:
return self.value.as_mara()
def __call__(self, *args, **kwargs):
return self.value(*args, **kwargs)
@@ -109,6 +106,3 @@ class MinerConfigValue:
def as_auradine(self) -> dict:
return {}
def as_mara(self) -> dict:
return {}

View File

@@ -71,15 +71,6 @@ class FanModeNormal(MinerConfigValue):
}
}
def as_mara(self) -> dict:
return {
"general-config": {"environment-profile": "AirCooling"},
"advance-config": {
"override-fan-control": False,
"fan-fixed-percent": 0,
},
}
@dataclass
class FanModeManual(MinerConfigValue):
@@ -129,15 +120,6 @@ class FanModeManual(MinerConfigValue):
def as_epic(self) -> dict:
return {"fans": {"Manual": {"speed": self.speed}}}
def as_mara(self) -> dict:
return {
"general-config": {"environment-profile": "AirCooling"},
"advance-config": {
"override-fan-control": True,
"fan-fixed-percent": self.speed,
},
}
@dataclass
class FanModeImmersion(MinerConfigValue):
@@ -158,9 +140,6 @@ class FanModeImmersion(MinerConfigValue):
def as_auradine(self) -> dict:
return {"fan": {"percentage": 0}}
def as_mara(self) -> dict:
return {"general-config": {"environment-profile": "OilImmersionCooling"}}
class FanModeConfig(MinerConfigOption):
normal = FanModeNormal
@@ -276,18 +255,4 @@ class FanModeConfig(MinerConfigOption):
fan_1_target = fan_data["Target"]
return cls.manual(speed=round((fan_1_target / fan_1_max) * 100))
except LookupError:
pass
return cls.default()
@classmethod
def from_mara(cls, web_config: dict):
try:
mode = web_config["general-config"]["environment-profile"]
if mode == "AirCooling":
if web_config["advance-config"]["override-fan-control"]:
return cls.manual(web_config["advance-config"]["fan-fixed-percent"])
return cls.normal()
return cls.immersion()
except LookupError:
pass
return cls.default()

View File

@@ -56,13 +56,6 @@ class MiningModeNormal(MinerConfigValue):
def as_goldshell(self) -> dict:
return {"settings": {"level": 0}}
def as_mara(self) -> dict:
return {
"mode": {
"work-mode-selector": "Stock",
}
}
@dataclass
class MiningModeSleep(MinerConfigValue):
@@ -89,13 +82,6 @@ class MiningModeSleep(MinerConfigValue):
def as_goldshell(self) -> dict:
return {"settings": {"level": 3}}
def as_mara(self) -> dict:
return {
"mode": {
"work-mode-selector": "Sleep",
}
}
@dataclass
class MiningModeLPM(MinerConfigValue):
@@ -233,17 +219,6 @@ class MiningModePowerTune(MinerConfigValue):
def as_auradine(self) -> dict:
return {"mode": {"mode": "custom", "tune": "power", "power": self.power}}
def as_mara(self) -> dict:
return {
"mode": {
"work-mode-selector": "Auto",
"concorde": {
"mode-select": "PowerTarget",
"power-target": self.power,
},
}
}
@dataclass
class MiningModeHashrateTune(MinerConfigValue):
@@ -294,17 +269,6 @@ class MiningModeHashrateTune(MinerConfigValue):
def as_epic(self) -> dict:
return {"ptune": {"algo": self.algo.as_epic(), "target": self.hashrate}}
def as_mara(self) -> dict:
return {
"mode": {
"work-mode-selector": "Auto",
"concorde": {
"mode-select": "Hashrate",
"hash-target": self.hashrate,
},
}
}
@dataclass
class ManualBoardSettings(MinerConfigValue):
@@ -356,17 +320,6 @@ class MiningModeManual(MinerConfigValue):
}
return cls(global_freq=freq, global_volt=voltage, boards=boards)
def as_mara(self) -> dict:
return {
"mode": {
"work-mode-selector": "Fixed",
"fixed": {
"frequency": str(self.global_freq),
"voltage": self.global_volt,
},
}
}
class MiningModeConfig(MinerConfigOption):
normal = MiningModeNormal
@@ -515,28 +468,3 @@ class MiningModeConfig(MinerConfigOption):
return cls.power_tuning(mode_data["Power"])
except LookupError:
return cls.default()
@classmethod
def from_mara(cls, web_config: dict):
try:
mode = web_config["mode"]["work-mode-selector"]
if mode == "Fixed":
fixed_conf = web_config["mode"]["fixed"]
return cls.manual(
global_freq=int(fixed_conf["frequency"]),
global_volt=fixed_conf["voltage"],
)
elif mode == "Stock":
return cls.normal()
elif mode == "Sleep":
return cls.sleep()
elif mode == "Auto":
auto_conf = web_config["mode"]["concorde"]
auto_mode = auto_conf["mode-select"]
if auto_mode == "Hashrate":
return cls.hashrate_tuning(hashrate=auto_conf["hash-target"])
elif auto_mode == "PowerTarget":
return cls.power_tuning(power=auto_conf["power-target"])
except LookupError:
pass
return cls.default()

View File

@@ -118,15 +118,6 @@ class Pool(MinerConfigValue):
}
return {"pool": self.url, "login": self.user, "password": self.password}
def as_mara(self, user_suffix: str = None) -> dict:
if user_suffix is not None:
return {
"url": self.url,
"user": f"{self.user}{user_suffix}",
"pass": self.password,
}
return {"url": self.url, "user": self.user, "pass": self.password}
@classmethod
def from_dict(cls, dict_conf: dict | None) -> "Pool":
return cls(
@@ -186,14 +177,6 @@ class Pool(MinerConfigValue):
password=grpc_pool["password"],
)
@classmethod
def from_mara(cls, web_pool: dict) -> "Pool":
return cls(
url=web_pool["url"],
user=web_pool["user"],
password=web_pool["pass"],
)
@dataclass
class PoolGroup(MinerConfigValue):
@@ -281,12 +264,9 @@ class PoolGroup(MinerConfigValue):
def as_auradine(self, user_suffix: str = None) -> list:
return [p.as_auradine(user_suffix=user_suffix) for p in self.pools]
def as_epic(self, user_suffix: str = None) -> list:
def as_epic(self, user_suffix: str = None) -> dict:
return [p.as_epic(user_suffix=user_suffix) for p in self.pools]
def as_mara(self, user_suffix: str = None) -> list:
return [p.as_mara(user_suffix=user_suffix) for p in self.pools]
@classmethod
def from_dict(cls, dict_conf: dict | None) -> "PoolGroup":
cls_conf = {}
@@ -356,10 +336,6 @@ class PoolGroup(MinerConfigValue):
except LookupError:
return cls()
@classmethod
def from_mara(cls, web_config_pools: dict) -> "PoolGroup":
return cls(pools=[Pool.from_mara(pool_conf) for pool_conf in web_config_pools])
@dataclass
class PoolConfig(MinerConfigValue):
@@ -451,11 +427,6 @@ class PoolConfig(MinerConfigValue):
}
}
def as_mara(self, user_suffix: str = None) -> dict:
if len(self.groups) > 0:
return {"pools": self.groups[0].as_mara(user_suffix=user_suffix)}
return {"pools": []}
@classmethod
def from_api(cls, api_pools: dict) -> "PoolConfig":
try:
@@ -510,7 +481,3 @@ class PoolConfig(MinerConfigValue):
)
except LookupError:
return cls()
@classmethod
def from_mara(cls, web_config: dict) -> "PoolConfig":
return cls(groups=[PoolGroup.from_mara(web_config["pools"])])

View File

@@ -642,8 +642,6 @@ class BOSer(BaseMiner):
_web_cls = BOSerWebAPI
web: BOSerWebAPI
firmware = "BOS+"
data_locations = BOSER_DATA_LOC
supports_autotuning = True

View File

@@ -1,69 +1,71 @@
from typing import List, Optional
from typing import Optional
from pyasic import MinerConfig
from pyasic.config import MiningModeConfig
from pyasic.data import Fan, HashBoard
from pyasic.errors import APIError
from pyasic.miners.base import BaseMiner
from pyasic.miners.data import DataFunction, DataLocations, DataOptions, WebAPICommand
from pyasic.misc import merge_dicts
from pyasic.miners.backends import AntminerModern
from pyasic.miners.data import (
DataFunction,
DataLocations,
DataOptions,
RPCAPICommand,
WebAPICommand,
)
from pyasic.web.marathon import MaraWebAPI
MARA_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac",
[WebAPICommand("web_overview", "overview")],
[WebAPICommand("web_get_system_info", "get_system_info")],
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[WebAPICommand("web_overview", "overview")],
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname",
[WebAPICommand("web_network_config", "network_config")],
[WebAPICommand("web_get_system_info", "get_system_info")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[WebAPICommand("web_brief", "brief")],
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[WebAPICommand("web_brief", "brief")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[WebAPICommand("web_hashboards", "hashboards")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[WebAPICommand("web_brief", "brief")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit",
[WebAPICommand("web_miner_config", "miner_config")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[WebAPICommand("web_fans", "fans")],
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.ERRORS): DataFunction(
"_get_errors",
[WebAPICommand("web_summary", "summary")],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
[WebAPICommand("web_locate_miner", "locate_miner")],
[WebAPICommand("web_get_blink_status", "get_blink_status")],
),
str(DataOptions.IS_MINING): DataFunction(
"_is_mining",
[WebAPICommand("web_brief", "brief")],
[WebAPICommand("web_get_conf", "get_miner_conf")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[WebAPICommand("web_brief", "brief")],
),
}
)
class MaraMiner(BaseMiner):
class MaraMiner(AntminerModern):
_web_cls = MaraWebAPI
web: MaraWebAPI
@@ -71,52 +73,6 @@ class MaraMiner(BaseMiner):
firmware = "MaraFW"
async def fault_light_off(self) -> bool:
res = await self.web.set_locate_miner(blinking=False)
return res.get("blinking") is False
async def fault_light_on(self) -> bool:
res = await self.web.set_locate_miner(blinking=True)
return res.get("blinking") is True
async def get_config(self) -> MinerConfig:
data = await self.web.get_miner_config()
if data:
self.config = MinerConfig.from_mara(data)
return self.config
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
data = await self.web.get_miner_config()
cfg_data = config.as_mara(user_suffix=user_suffix)
merged_cfg = merge_dicts(data, cfg_data)
await self.web.set_miner_config(**merged_cfg)
async def set_power_limit(self, wattage: int) -> bool:
cfg = await self.get_config()
cfg.mining_mode = MiningModeConfig.power_tuning(wattage)
await self.send_config(cfg)
return True
async def stop_mining(self) -> bool:
data = await self.web.get_miner_config()
data["mode"]["work-mode-selector"] = "Sleep"
await self.web.set_miner_config(**data)
return True
async def resume_mining(self) -> bool:
data = await self.web.get_miner_config()
data["mode"]["work-mode-selector"] = "Auto"
await self.web.set_miner_config(**data)
return True
async def reboot(self) -> bool:
await self.web.reboot()
return True
async def restart_backend(self) -> bool:
await self.web.reload()
return True
async def _get_wattage(self, web_brief: dict = None) -> Optional[int]:
if web_brief is None:
try:
@@ -126,173 +82,6 @@ class MaraMiner(BaseMiner):
if web_brief is not None:
try:
return round(web_brief["power_consumption_estimated"])
except LookupError:
pass
async def _is_mining(self, web_brief: dict = None) -> Optional[bool]:
if web_brief is None:
try:
web_brief = await self.web.brief()
except APIError:
pass
if web_brief is not None:
try:
return web_brief["status"] == "Mining"
except LookupError:
pass
async def _get_uptime(self, web_brief: dict = None) -> Optional[int]:
if web_brief is None:
try:
web_brief = await self.web.brief()
except APIError:
pass
if web_brief is not None:
try:
return web_brief["elapsed"]
except LookupError:
pass
async def _get_hashboards(self, web_hashboards: dict = None) -> List[HashBoard]:
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if web_hashboards is None:
try:
web_hashboards = await self.web.hashboards()
except APIError:
pass
if web_hashboards is not None:
try:
for hb in web_hashboards["hashboards"]:
idx = hb["index"]
hashboards[idx].hashrate = round(hb["hashrate_average"] / 1000, 2)
hashboards[idx].temp = round(
sum(hb["temperature_pcb"]) / len(hb["temperature_pcb"]), 2
)
hashboards[idx].chip_temp = round(
sum(hb["temperature_chip"]) / len(hb["temperature_chip"]), 2
)
hashboards[idx].chips = hb["asic_num"]
hashboards[idx].serial_number = hb["serial_number"]
hashboards[idx].missing = False
except LookupError:
pass
return hashboards
async def _get_mac(self, web_overview: dict = None) -> Optional[str]:
if web_overview is None:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_overview is not None:
try:
return web_overview["mac"].upper()
except LookupError:
pass
async def _get_fw_ver(self, web_overview: dict = None) -> Optional[str]:
if web_overview is None:
try:
web_overview = await self.web.overview()
except APIError:
pass
if web_overview is not None:
try:
return web_overview["version_firmware"]
except LookupError:
pass
async def _get_hostname(self, web_network_config: dict = None) -> Optional[str]:
if web_network_config is None:
try:
web_network_config = await self.web.get_network_config()
except APIError:
pass
if web_network_config is not None:
try:
return web_network_config["hostname"]
except LookupError:
pass
async def _get_hashrate(self, web_brief: dict = None) -> Optional[float]:
if web_brief is None:
try:
web_brief = await self.web.brief()
except APIError:
pass
if web_brief is not None:
try:
return round(web_brief["hashrate_realtime"], 2)
except LookupError:
pass
async def _get_fans(self, web_fans: dict = None) -> List[Fan]:
if web_fans is None:
try:
web_fans = await self.web.fans()
except APIError:
pass
if web_fans is not None:
fans = []
for n in range(self.expected_fans):
try:
fans.append(Fan(web_fans["fans"][n]["current_speed"]))
except (IndexError, KeyError):
pass
return fans
return [Fan() for _ in range(self.expected_fans)]
async def _get_fault_light(self, web_locate_miner: dict = None) -> bool:
if web_locate_miner is None:
try:
web_locate_miner = await self.web.get_locate_miner()
except APIError:
pass
if web_locate_miner is not None:
try:
return web_locate_miner["blinking"]
except LookupError:
pass
return False
async def _get_expected_hashrate(self, web_brief: dict = None) -> Optional[float]:
if web_brief is None:
try:
web_brief = await self.web.brief()
except APIError:
pass
if web_brief is not None:
try:
return round(web_brief["hashrate_ideal"] / 1000, 2)
except LookupError:
pass
async def _get_wattage_limit(
self, web_miner_config: dict = None
) -> Optional[float]:
if web_miner_config is None:
try:
web_miner_config = await self.web.get_miner_config()
except APIError:
pass
if web_miner_config is not None:
try:
return web_miner_config["mode"]["concorde"]["power-target"]
return web_brief["power_consumption_estimated"]
except LookupError:
pass

View File

@@ -542,32 +542,25 @@ class MinerFactory:
async def _get_miner_web(self, ip: str) -> MinerTypes | None:
urls = [f"http://{ip}/", f"https://{ip}/"]
async with httpx.AsyncClient(
transport=settings.transport(verify=False)
) as session:
tasks = [asyncio.create_task(self._web_ping(session, url)) for url in urls]
tasks = [asyncio.create_task(self._web_ping(url)) for url in urls]
text, resp = await concurrent_get_first_result(
tasks,
lambda x: x[0] is not None
and self._parse_web_type(x[0], x[1]) is not None,
lambda x: x[0] is not None and self._parse_web_type(x[0], x[1]) is not None,
)
if text is not None:
mtype = self._parse_web_type(text, resp)
if mtype == MinerTypes.ANTMINER:
# could still be mara
auth = httpx.DigestAuth("root", "root")
res = await self.send_web_command(ip, "/kaonsu/v1/brief", auth=auth)
if res is not None:
mtype = MinerTypes.MARATHON
return mtype
return self._parse_web_type(text, resp)
@staticmethod
async def _web_ping(
session: httpx.AsyncClient, url: str
) -> tuple[str | None, httpx.Response | None]:
async def _web_ping(url: str) -> tuple[str | None, httpx.Response | None]:
try:
resp = await session.get(url, follow_redirects=True)
async with httpx.AsyncClient(
transport=settings.transport(verify=False)
) as c:
resp = await asyncio.wait_for(
c.get(url, follow_redirects=True),
settings.get("api_function_timeout", 5),
)
return resp.text, resp
except (
httpx.HTTPError,

View File

@@ -21,33 +21,22 @@ class MinerListenerProtocol(asyncio.Protocol):
def __init__(self):
self.responses = {}
self.transport = None
self.new_miner = None
async def get_new_miner(self):
try:
while self.new_miner is None:
await asyncio.sleep(0)
return self.new_miner
finally:
self.new_miner = None
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, _addr):
if data == b"OK\x00\x00\x00\x00\x00\x00\x00\x00":
return
@staticmethod
def datagram_received(data, _addr):
m = data.decode()
if "," in m:
ip, mac = m.split(",")
if "/" in ip:
ip = ip.replace("[", "").split("/")[0]
else:
d = m[:-1].split("MAC")
ip = d[0][3:]
mac = d[1][1:]
self.new_miner = {"IP": ip, "MAC": mac.upper()}
new_miner = {"IP": ip, "MAC": mac.upper()}
MinerListener().new_miner = new_miner
def connection_lost(self, _):
pass
@@ -56,32 +45,32 @@ class MinerListenerProtocol(asyncio.Protocol):
class MinerListener:
def __init__(self, bind_addr: str = "0.0.0.0"):
self.found_miners = []
self.stop = asyncio.Event()
self.new_miner = None
self.stop = False
self.bind_addr = bind_addr
async def listen(self):
self.stop = False
loop = asyncio.get_running_loop()
transport_14235, protocol_14235 = await loop.create_datagram_endpoint(
transport_14235, _ = await loop.create_datagram_endpoint(
MinerListenerProtocol, local_addr=(self.bind_addr, 14235)
)
transport_8888, protocol_8888 = await loop.create_datagram_endpoint(
transport_8888, _ = await loop.create_datagram_endpoint(
MinerListenerProtocol, local_addr=(self.bind_addr, 8888)
)
try:
while not self.stop.is_set():
tasks = [
asyncio.create_task(protocol_14235.get_new_miner()),
asyncio.create_task(protocol_8888.get_new_miner()),
]
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in tasks:
if t.done():
yield t.result()
finally:
while True:
if self.new_miner:
yield self.new_miner
self.found_miners.append(self.new_miner)
self.new_miner = None
if self.stop:
transport_14235.close()
transport_8888.close()
break
await asyncio.sleep(0)
async def cancel(self):
self.stop = True

View File

@@ -46,10 +46,9 @@ _settings = { # defaults
ssl_cxt = httpx.create_ssl_context()
# this function configures socket options like SO_LINGER and returns an AsyncHTTPTransport instance to perform asynchronous HTTP requests
# using those options.
# SO_LINGER controls what happens when you close a socket with unsent data - it allows specifying linger time for the data to be sent.
#this function configures socket options like SO_LINGER and returns an AsyncHTTPTransport instance to perform asynchronous HTTP requests
#using those options.
#SO_LINGER controls what happens when you close a socket with unsent data - it allows specifying linger time for the data to be sent.
def transport(verify: Union[str, bool, SSLContext] = ssl_cxt):
l_onoff = 1
l_linger = get("so_linger_time", 1000)

View File

@@ -9,7 +9,6 @@ class AntminerModernSSH(BaseSSH):
Args:
ip (str): The IP address of the Antminer device.
"""
def __init__(self, ip: str):
super().__init__(ip)
self.pwd = settings.get("default_antminer_ssh_password", "root")

View File

@@ -1,56 +1,30 @@
from __future__ import annotations
import asyncio
import json
from typing import Any
import httpx
from pyasic import settings
from pyasic.web.base import BaseWebAPI
from pyasic.web.antminer import AntminerModernWebAPI
class MaraWebAPI(BaseWebAPI):
class MaraWebAPI(AntminerModernWebAPI):
def __init__(self, ip: str) -> None:
self.am_commands = [
"get_miner_conf",
"set_miner_conf",
"blink",
"reboot",
"get_system_info",
"get_network_info",
"summary",
"get_blink_status",
"set_network_conf",
]
super().__init__(ip)
async def multicommand(
self, *commands: str, ignore_errors: bool = False, allow_warning: bool = True
) -> dict:
async with httpx.AsyncClient(transport=settings.transport()) as client:
tasks = [
asyncio.create_task(self._handle_multicommand(client, command))
for command in commands
]
all_data = await asyncio.gather(*tasks)
data = {}
for item in all_data:
data.update(item)
data["multicommand"] = True
return data
async def _handle_multicommand(
self, client: httpx.AsyncClient, command: str
) -> dict:
auth = httpx.DigestAuth(self.username, self.pwd)
try:
url = f"http://{self.ip}:{self.port}/kaonsu/v1/{command}"
ret = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if ret.status_code == 200:
try:
json_data = ret.json()
return {command: json_data}
except json.decoder.JSONDecodeError:
pass
return {command: {}}
async def send_command(
async def _send_mara_command(
self,
command: str | bytes,
ignore_errors: bool = False,
@@ -82,51 +56,64 @@ class MaraWebAPI(BaseWebAPI):
except json.decoder.JSONDecodeError:
pass
async def brief(self):
return await self.send_command("brief")
async def _send_am_command(
self,
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
privileged: bool = False,
**parameters: Any,
):
url = f"http://{self.ip}:{self.port}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd)
try:
async with httpx.AsyncClient(
transport=settings.transport(),
) as client:
if parameters:
data = await client.post(
url,
auth=auth,
timeout=settings.get("api_function_timeout", 3),
json=parameters,
)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
async def ping(self):
return await self.send_command("ping")
async def get_locate_miner(self):
return await self.send_command("locate_miner")
async def set_locate_miner(self, blinking: bool):
return await self.send_command("locate_miner", blinking=blinking)
async def reboot(self):
return await self.send_command("maintenance", type="reboot")
async def reset(self):
return await self.send_command("maintenance", type="reset")
async def reload(self):
return await self.send_command("maintenance", type="reload")
async def set_password(self, new_pwd: str):
return await self.send_command(
"maintenance",
type="passwd",
params={"curPwd": self.pwd, "confirmPwd": self.pwd, "newPwd": new_pwd},
async def send_command(
self,
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
privileged: bool = False,
**parameters: Any,
) -> dict:
if command in self.am_commands:
return await self._send_am_command(
command,
ignore_errors=ignore_errors,
allow_warning=allow_warning,
privileged=privileged,
**parameters,
)
return await self._send_mara_command(
command,
ignore_errors=ignore_errors,
allow_warning=allow_warning,
privileged=privileged,
**parameters,
)
async def get_network_config(self):
return await self.send_command("network_config")
async def set_network_config(self, **params):
return await self.send_command("network_config", **params)
async def get_miner_config(self):
return await self.send_command("miner_config")
async def set_miner_config(self, **params):
return await self.send_command("miner_config", **params)
async def fans(self):
return await self.send_command("fans")
async def log(self):
return await self.send_command("log")
async def brief(self):
return await self.send_command("brief")
async def overview(self):
return await self.send_command("overview")
@@ -134,14 +121,11 @@ class MaraWebAPI(BaseWebAPI):
async def connections(self):
return await self.send_command("connections")
async def controlboard_info(self):
return await self.send_command("controlboard_info")
async def event_chart(self):
return await self.send_command("event_chart")
async def hashboards(self):
return await self.send_command("hashboards")
async def pools(self):
return await self.send_command("pools")
async def mara_pools(self):
return await self._send_mara_command("pools")

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyasic"
version = "0.55.0"
version = "0.54.20"
description = "A simplified and standardized interface for Bitcoin ASICs."
authors = ["UpstreamData <brett@upstreamdata.ca>"]
repository = "https://github.com/UpstreamData/pyasic"

View File

@@ -33,43 +33,3 @@ class TestFanConfig(unittest.TestCase):
conf = fan_mode()
am_conf = conf.as_am_modern()
self.assertEqual(conf, FanModeConfig.from_am_modern(am_conf))
def test_epic_deserialize_and_serialize(self):
for fan_mode in FanModeConfig:
with self.subTest(
msg=f"Test serialization and deserialization of epic fan config",
fan_mode=fan_mode,
):
conf = fan_mode()
epic_conf = conf.as_epic()
self.assertEqual(conf, FanModeConfig.from_epic(epic_conf))
def test_vnish_deserialize_and_serialize(self):
for fan_mode in FanModeConfig:
with self.subTest(
msg=f"Test serialization and deserialization of vnish fan config",
fan_mode=fan_mode,
):
conf = fan_mode()
vnish_conf = conf.as_vnish()
self.assertEqual(conf, FanModeConfig.from_vnish(vnish_conf))
def test_auradine_deserialize_and_serialize(self):
for fan_mode in FanModeConfig:
with self.subTest(
msg=f"Test serialization and deserialization of auradine fan config",
fan_mode=fan_mode,
):
conf = fan_mode()
aur_conf = conf.as_auradine()
self.assertEqual(conf, FanModeConfig.from_auradine(aur_conf))
def test_boser_deserialize_and_serialize(self):
for fan_mode in FanModeConfig:
with self.subTest(
msg=f"Test serialization and deserialization of boser fan config",
fan_mode=fan_mode,
):
conf = fan_mode()
boser_conf = conf.as_boser()
self.assertEqual(conf, FanModeConfig.from_boser(boser_conf))