feature: add support for hammer miner

This commit is contained in:
Upstream Data
2024-11-14 08:32:12 -07:00
parent 76a870c2ed
commit 924b62e0d5
7 changed files with 644 additions and 0 deletions

View File

@@ -156,6 +156,9 @@ class MinerConfig:
**self.pools.as_luxos(user_suffix=user_suffix),
}
def as_hammer(self, *args, **kwargs) -> dict:
return self.as_am_modern(*args, **kwargs)
@classmethod
def from_dict(cls, dict_conf: dict) -> "MinerConfig":
"""Constructs a MinerConfig object from a dictionary."""
@@ -276,3 +279,7 @@ class MinerConfig:
),
pools=PoolConfig.from_luxos(rpc_pools=rpc_pools, rpc_groups=rpc_groups),
)
@classmethod
def from_hammer(cls, *args, **kwargs) -> "MinerConfig":
return cls.from_am_modern(*args, **kwargs)

View File

@@ -0,0 +1,360 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import List, Optional
from pyasic import MinerConfig
from pyasic.data import AlgoHashRate, HashBoard, HashUnit
from pyasic.data.error_codes import MinerErrorData, X19Error
from pyasic.data.pools import PoolMetrics, PoolUrl
from pyasic.errors import APIError
from pyasic.miners.base import BaseMiner
from pyasic.miners.data import (
DataFunction,
DataLocations,
DataOptions,
RPCAPICommand,
WebAPICommand,
)
from pyasic.rpc.ccminer import CCMinerRPCAPI
from pyasic.web.hammer import HammerWebAPI
HAMMER_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac",
[WebAPICommand("web_get_system_info", "get_system_info")],
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname",
[WebAPICommand("web_get_system_info", "get_system_info")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.ERRORS): DataFunction(
"_get_errors",
[WebAPICommand("web_summary", "summary")],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
[WebAPICommand("web_get_blink_status", "get_blink_status")],
),
str(DataOptions.IS_MINING): DataFunction(
"_is_mining",
[WebAPICommand("web_get_conf", "get_miner_conf")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.POOLS): DataFunction(
"_get_pools",
[RPCAPICommand("rpc_pools", "pools")],
),
}
)
class BlackMiner(BaseMiner):
"""Handler for Hammer miners."""
_rpc_cls = CCMinerRPCAPI
rpc: CCMinerRPCAPI
_web_cls = HammerWebAPI
web: HammerWebAPI
data_locations = HAMMER_DATA_LOC
async def get_config(self) -> MinerConfig:
data = await self.web.get_miner_conf()
if data:
self.config = MinerConfig.from_hammer(data)
return self.config
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
await self.web.set_miner_conf(config.as_hammer(user_suffix=user_suffix))
async def fault_light_on(self) -> bool:
data = await self.web.blink(blink=True)
if data:
if data.get("code") == "B000":
self.light = True
return self.light
async def fault_light_off(self) -> bool:
data = await self.web.blink(blink=False)
if data:
if data.get("code") == "B100":
self.light = False
return self.light
async def reboot(self) -> bool:
data = await self.web.reboot()
if data:
return True
return False
async def _get_hostname(self, web_get_system_info: dict = None) -> Optional[str]:
if web_get_system_info is None:
try:
web_get_system_info = await self.web.get_system_info()
except APIError:
pass
if web_get_system_info is not None:
try:
return web_get_system_info["hostname"]
except KeyError:
pass
async def _get_mac(self, web_get_system_info: dict = None) -> Optional[str]:
if web_get_system_info is None:
try:
web_get_system_info = await self.web.get_system_info()
except APIError:
pass
if web_get_system_info is not None:
try:
return web_get_system_info["macaddr"]
except KeyError:
pass
try:
data = await self.web.get_network_info()
if data:
return data["macaddr"]
except KeyError:
pass
async def _get_errors(self, web_summary: dict = None) -> List[MinerErrorData]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
errors = []
if web_summary is not None:
try:
for item in web_summary["SUMMARY"][0]["status"]:
try:
if not item["status"] == "s":
errors.append(X19Error(item["msg"]))
except KeyError:
continue
except LookupError:
pass
return errors
async def _get_hashboards(self) -> List[HashBoard]:
hashboards = [
HashBoard(idx, expected_chips=self.expected_chips)
for idx in range(self.expected_hashboards)
]
try:
rpc_stats = await self.web.stats(new_api=True)
except APIError:
return hashboards
if rpc_stats is not None:
try:
for board in rpc_stats["STATS"][0]["chain"]:
hashboards[board["index"]].hashrate = AlgoHashRate.SHA256(
board["rate_real"], HashUnit.SHA256.GH
).into(self.algo.unit.default)
hashboards[board["index"]].chips = board["asic_num"]
board_temp_data = list(
filter(lambda x: not x == 0, board["temp_pcb"])
)
hashboards[board["index"]].temp = sum(board_temp_data) / len(
board_temp_data
)
chip_temp_data = list(
filter(lambda x: not x == 0, board["temp_chip"])
)
hashboards[board["index"]].chip_temp = sum(chip_temp_data) / len(
chip_temp_data
)
hashboards[board["index"]].serial_number = board["sn"]
hashboards[board["index"]].missing = False
except LookupError:
pass
return hashboards
async def _get_fault_light(
self, web_get_blink_status: dict = None
) -> Optional[bool]:
if self.light:
return self.light
if web_get_blink_status is None:
try:
web_get_blink_status = await self.web.get_blink_status()
except APIError:
pass
if web_get_blink_status is not None:
try:
self.light = web_get_blink_status["blink"]
except KeyError:
pass
return self.light
async def _get_expected_hashrate(
self, rpc_stats: dict = None
) -> Optional[AlgoHashRate]:
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
if rpc_stats is not None:
try:
expected_rate = rpc_stats["STATS"][1]["total_rateideal"]
try:
rate_unit = rpc_stats["STATS"][1]["rate_unit"]
except KeyError:
rate_unit = "GH"
return AlgoHashRate.SHA256(
expected_rate, HashUnit.SHA256.from_str(rate_unit)
).into(self.algo.unit.default)
except LookupError:
pass
async def set_static_ip(
self,
ip: str,
dns: str,
gateway: str,
subnet_mask: str = "255.255.255.0",
hostname: str = None,
):
if not hostname:
hostname = await self.get_hostname()
await self.web.set_network_conf(
ip=ip,
dns=dns,
gateway=gateway,
subnet_mask=subnet_mask,
hostname=hostname,
protocol=2,
)
async def set_dhcp(self, hostname: str = None):
if not hostname:
hostname = await self.get_hostname()
await self.web.set_network_conf(
ip="", dns="", gateway="", subnet_mask="", hostname=hostname, protocol=1
)
async def set_hostname(self, hostname: str):
cfg = await self.web.get_network_info()
dns = cfg["conf_dnsservers"]
gateway = cfg["conf_gateway"]
ip = cfg["conf_ipaddress"]
subnet_mask = cfg["conf_netmask"]
protocol = 1 if cfg["conf_nettype"] == "DHCP" else 2
await self.web.set_network_conf(
ip=ip,
dns=dns,
gateway=gateway,
subnet_mask=subnet_mask,
hostname=hostname,
protocol=protocol,
)
async def _is_mining(self, web_get_conf: dict = None) -> Optional[bool]:
if web_get_conf is None:
try:
web_get_conf = await self.web.get_miner_conf()
except APIError:
pass
if web_get_conf is not None:
try:
if str(web_get_conf["bitmain-work-mode"]).isdigit():
return (
False if int(web_get_conf["bitmain-work-mode"]) == 1 else True
)
return False
except LookupError:
pass
async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
if rpc_stats is not None:
try:
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
except APIError:
pass
pools_data = []
if rpc_pools is not None:
try:
pools = rpc_pools.get("POOLS", [])
for pool_info in pools:
url = pool_info.get("URL")
pool_url = PoolUrl.from_str(url) if url else None
pool_data = PoolMetrics(
accepted=pool_info.get("Accepted"),
rejected=pool_info.get("Rejected"),
get_failures=pool_info.get("Get Failures"),
remote_failures=pool_info.get("Remote Failures"),
active=pool_info.get("Stratum Active"),
alive=pool_info.get("Status") == "Alive",
url=pool_url,
user=pool_info.get("User"),
index=pool_info.get("POOL"),
)
pools_data.append(pool_data)
except LookupError:
pass
return pools_data

View File

@@ -17,6 +17,7 @@ from .bfgminer import BFGMinerRPCAPI
from .bmminer import BMMinerRPCAPI
from .bosminer import BOSMinerRPCAPI
from .btminer import BTMinerRPCAPI
from .ccminer import CCMinerRPCAPI
from .cgminer import CGMinerRPCAPI
from .gcminer import GCMinerRPCAPI
from .luxminer import LUXMinerRPCAPI

34
pyasic/rpc/ccminer.py Normal file
View File

@@ -0,0 +1,34 @@
# ------------------------------------------------------------------------------
# Copyright 2024 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.rpc.bmminer import BMMinerRPCAPI
class CCMinerRPCAPI(BMMinerRPCAPI):
"""An abstraction of the CCMiner API.
Each method corresponds to an API command in CCMiner.
This class abstracts use of the CCMiner API, as well as the
methods for sending commands to it. The `self.send_command()`
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.port = 8359

View File

@@ -31,6 +31,7 @@ _settings = { # defaults
"default_whatsminer_rpc_password": "admin",
"default_innosilicon_web_password": "admin",
"default_antminer_web_password": "root",
"default_hammer_web_password": "root",
"default_bosminer_web_password": "root",
"default_vnish_web_password": "admin",
"default_goldshell_web_password": "123456789",

View File

@@ -19,6 +19,7 @@ from .base import BaseWebAPI
from .braiins_os import BOSerWebAPI, BOSMinerWebAPI
from .epic import ePICWebAPI
from .goldshell import GoldshellWebAPI
from .hammer import HammerWebAPI
from .iceriver import IceRiverWebAPI
from .innosilicon import InnosiliconWebAPI
from .vnish import VNishWebAPI

240
pyasic/web/hammer.py Normal file
View File

@@ -0,0 +1,240 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import json
from typing import Any
import httpx
from pyasic import settings
from pyasic.web.base import BaseWebAPI
class HammerWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
"""Initialize the modern Hammer API client with a specific IP address.
Args:
ip (str): IP address of the Hammer device.
"""
super().__init__(ip)
self.username = "root"
self.pwd = settings.get("default_hammer_web_password", "root")
async def send_command(
self,
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
privileged: bool = False,
**parameters: Any,
) -> dict:
"""Send a command to the Hammer device using HTTP digest authentication.
Args:
command (str | bytes): The CGI command to send.
ignore_errors (bool): If True, ignore any HTTP errors.
allow_warning (bool): If True, proceed with warnings.
privileged (bool): If set to True, requires elevated privileges.
**parameters: Arbitrary keyword arguments to be sent as parameters in the request.
Returns:
dict: The JSON response from the device or an empty dictionary if an error occurs.
"""
url = f"http://{self.ip}:{self.port}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd)
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
if parameters:
data = await client.post(
url,
auth=auth,
timeout=settings.get("api_function_timeout", 3),
json=parameters,
)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError as e:
return {"success": False, "message": f"HTTP error occurred: {str(e)}"}
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
return {"success": False, "message": "Failed to decode JSON"}
return {"success": False, "message": "Unknown error occurred"}
async def multicommand(
self, *commands: str, ignore_errors: bool = False, allow_warning: bool = True
) -> dict:
"""Execute multiple commands simultaneously.
Args:
*commands (str): Multiple command strings to be executed.
ignore_errors (bool): If True, ignore any HTTP errors.
allow_warning (bool): If True, proceed with warnings.
Returns:
dict: A dictionary containing the results of all commands executed.
"""
async with httpx.AsyncClient(transport=settings.transport()) as client:
tasks = [
asyncio.create_task(self._handle_multicommand(client, command))
for command in commands
]
all_data = await asyncio.gather(*tasks)
data = {}
for item in all_data:
data.update(item)
data["multicommand"] = True
return data
async def _handle_multicommand(
self, client: httpx.AsyncClient, command: str
) -> dict:
"""Helper function for handling individual commands in a multicommand execution.
Args:
client (httpx.AsyncClient): The HTTP client to use for the request.
command (str): The command to be executed.
Returns:
dict: A dictionary containing the response of the executed command.
"""
auth = httpx.DigestAuth(self.username, self.pwd)
try:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
ret = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if ret.status_code == 200:
try:
json_data = ret.json()
return {command: json_data}
except json.decoder.JSONDecodeError:
pass
return {command: {}}
async def get_miner_conf(self) -> dict:
"""Retrieve the miner configuration from the Hammer device.
Returns:
dict: A dictionary containing the current configuration of the miner.
"""
return await self.send_command("get_miner_conf")
async def set_miner_conf(self, conf: dict) -> dict:
"""Set the configuration for the miner.
Args:
conf (dict): A dictionary of configuration settings to apply to the miner.
Returns:
dict: A dictionary response from the device after setting the configuration.
"""
return await self.send_command("set_miner_conf", **conf)
async def blink(self, blink: bool) -> dict:
"""Control the blinking of the LED on the miner device.
Args:
blink (bool): True to start blinking, False to stop.
Returns:
dict: A dictionary response from the device after the command execution.
"""
if blink:
return await self.send_command("blink", blink="true")
return await self.send_command("blink", blink="false")
async def reboot(self) -> dict:
"""Reboot the miner device.
Returns:
dict: A dictionary response from the device confirming the reboot command.
"""
return await self.send_command("reboot")
async def get_system_info(self) -> dict:
"""Retrieve system information from the miner.
Returns:
dict: A dictionary containing system information of the miner.
"""
return await self.send_command("get_system_info")
async def get_network_info(self) -> dict:
"""Retrieve network configuration information from the miner.
Returns:
dict: A dictionary containing the network configuration of the miner.
"""
return await self.send_command("get_network_info")
async def summary(self) -> dict:
"""Get a summary of the miner's status and performance.
Returns:
dict: A summary of the miner's current operational status.
"""
return await self.send_command("summary")
async def get_blink_status(self) -> dict:
"""Check the status of the LED blinking on the miner.
Returns:
dict: A dictionary indicating whether the LED is currently blinking.
"""
return await self.send_command("get_blink_status")
async def set_network_conf(
self,
ip: str,
dns: str,
gateway: str,
subnet_mask: str,
hostname: str,
protocol: int,
) -> dict:
"""Set the network configuration of the miner.
Args:
ip (str): IP address of the device.
dns (str): DNS server IP address.
gateway (str): Gateway IP address.
subnet_mask (str): Network subnet mask.
hostname (str): Hostname of the device.
protocol (int): Network protocol used.
Returns:
dict: A dictionary response from the device after setting the network configuration.
"""
return await self.send_command(
"set_network_conf",
ipAddress=ip,
ipDns=dns,
ipGateway=gateway,
ipHost=hostname,
ipPro=protocol,
ipSub=subnet_mask,
)