Merge pull request #195 from Ytemiloluwa/CGMiner

feat: Add _get_pools method for CGMiner (StockFirmware)
This commit is contained in:
Brett Rowan
2024-08-23 14:57:35 -06:00
committed by GitHub

View File

@@ -14,7 +14,7 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from typing import Optional from typing import Optional, List
from pyasic.config import MinerConfig from pyasic.config import MinerConfig
from pyasic.data import AlgoHashRate, HashUnit from pyasic.data import AlgoHashRate, HashUnit
@@ -22,6 +22,7 @@ from pyasic.errors import APIError
from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand
from pyasic.miners.device.firmware import StockFirmware from pyasic.miners.device.firmware import StockFirmware
from pyasic.rpc.cgminer import CGMinerRPCAPI from pyasic.rpc.cgminer import CGMinerRPCAPI
from pyasic.data.pools import PoolMetrics, PoolUrl
CGMINER_DATA_LOC = DataLocations( CGMINER_DATA_LOC = DataLocations(
**{ **{
@@ -53,6 +54,10 @@ CGMINER_DATA_LOC = DataLocations(
"_get_uptime", "_get_uptime",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_stats", "stats")],
), ),
str(DataOptions.POOLS): DataFunction(
"_get_pools",
[RPCAPICommand("rpc_pools", "pools")],
),
} }
) )
@@ -136,3 +141,33 @@ class CGMiner(StockFirmware):
return int(rpc_stats["STATS"][1]["Elapsed"]) return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError: except LookupError:
pass pass
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
except APIError:
pass
pools_data = []
if rpc_pools is not None:
try:
pools = rpc_pools.get("POOLS", [])
for pool_info in pools:
url = pool_info.get("URL")
pool_url = PoolUrl.from_str(url) if url else None
pool_data = PoolMetrics(
accepted=pool_info.get("Accepted"),
rejected=pool_info.get("Rejected"),
get_failures=pool_info.get("Get Failures"),
remote_failures=pool_info.get("Remote Failures"),
active=pool_info.get("Stratum Active"),
alive=pool_info.get("Status") == "Alive",
url=pool_url,
user=pool_info.get("User"),
index=pool_info.get("POOL"),
)
pools_data.append(pool_data)
except LookupError:
pass
return pools_data