feat: Add _get_pools method for Bmminer. (#233)

* backends: Add _get_pools for Bmminer

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Temi
2024-11-13 18:18:07 +01:00
committed by GitHub
parent 309356243b
commit 76a870c2ed

View File

@@ -18,6 +18,7 @@ from typing import List, Optional
from pyasic.config import MinerConfig
from pyasic.data import AlgoHashRate, Fan, HashBoard, HashUnit
from pyasic.data.pools import PoolMetrics, PoolUrl
from pyasic.errors import APIError
from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand
from pyasic.miners.device.firmware import StockFirmware
@@ -53,6 +54,10 @@ BMMINER_DATA_LOC = DataLocations(
"_get_uptime",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.POOLS): DataFunction(
"_get_pools",
[RPCAPICommand("rpc_pools", "pools")],
),
}
)
@@ -258,3 +263,33 @@ class BMMiner(StockFirmware):
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
except APIError:
pass
pools_data = []
if rpc_pools is not None:
try:
pools = rpc_pools.get("POOLS", [])
for pool_info in pools:
url = pool_info.get("URL")
pool_url = PoolUrl.from_str(url) if url else None
pool_data = PoolMetrics(
accepted=pool_info.get("Accepted"),
rejected=pool_info.get("Rejected"),
get_failures=pool_info.get("Get Failures"),
remote_failures=pool_info.get("Remote Failures"),
active=pool_info.get("Stratum Active"),
alive=pool_info.get("Status") == "Alive",
url=pool_url,
user=pool_info.get("User"),
index=pool_info.get("POOL"),
)
pools_data.append(pool_data)
except LookupError:
pass
return pools_data