Compare commits

...

8 Commits

Author SHA1 Message Date
Upstream Data
cb3c50d007 version: bump version number 2024-11-14 11:16:25 -07:00
Upstream Data
2523ef8484 bug: fix some issues with hammer miners 2024-11-14 11:16:09 -07:00
Upstream Data
01342738b0 version: bump version number 2024-11-14 10:26:50 -07:00
Upstream Data
a9dee4a911 feature: add support for bitaxe gamma 2024-11-14 10:26:00 -07:00
Upstream Data
883ffe20b4 bug: fix bmminer subclass pools data location 2024-11-14 10:25:38 -07:00
Upstream Data
261527a380 feature: add support for hammer D10 and hammer discovery 2024-11-14 08:45:59 -07:00
Upstream Data
924b62e0d5 feature: add support for hammer miner 2024-11-14 08:45:59 -07:00
Temi
76a870c2ed feat: Add _get_pools method for Bmminer. (#233)
* backends: Add _get_pools for Bmminer

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2024-11-13 10:18:07 -07:00
28 changed files with 933 additions and 3 deletions

View File

@@ -156,6 +156,9 @@ class MinerConfig:
**self.pools.as_luxos(user_suffix=user_suffix),
}
def as_hammer(self, *args, **kwargs) -> dict:
return self.as_am_modern(*args, **kwargs)
@classmethod
def from_dict(cls, dict_conf: dict) -> "MinerConfig":
"""Constructs a MinerConfig object from a dictionary."""
@@ -276,3 +279,7 @@ class MinerConfig:
),
pools=PoolConfig.from_luxos(rpc_pools=rpc_pools, rpc_groups=rpc_groups),
)
@classmethod
def from_hammer(cls, *args, **kwargs) -> "MinerConfig":
return cls.from_am_modern(*args, **kwargs)

View File

@@ -27,6 +27,7 @@ class MinerMake(str, Enum):
EPIC = "ePIC"
BITAXE = "BitAxe"
ICERIVER = "IceRiver"
HAMMER = "Hammer"
def __str__(self):
return self.value

View File

@@ -344,6 +344,7 @@ class BitAxeModels(str, Enum):
BM1366 = "Ultra"
BM1368 = "Supra"
BM1397 = "Max"
BM1370 = "Gamma"
def __str__(self):
return self.value
@@ -364,6 +365,13 @@ class IceRiverModels(str, Enum):
return self.value
class HammerModels(str, Enum):
D10 = "D10"
def __str__(self):
return self.value
class MinerModel:
ANTMINER = AntminerModels
WHATSMINER = WhatsminerModels
@@ -374,3 +382,4 @@ class MinerModel:
EPIC = ePICModels
BITAXE = BitAxeModels
ICERIVER = IceRiverModels
HAMMER = HammerModels

View File

@@ -62,6 +62,10 @@ HIVEON_T9_DATA_LOC = DataLocations(
"_get_uptime",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.POOLS): DataFunction(
"_get_pools",
[RPCAPICommand("rpc_pools", "pools")],
),
}
)

View File

@@ -17,16 +17,19 @@ from .antminer import AntminerModern, AntminerOld
from .auradine import Auradine
from .avalonminer import AvalonMiner
from .bfgminer import BFGMiner
from .bitaxe import BitAxe
from .bmminer import BMMiner
from .braiins_os import BOSer, BOSMiner
from .btminer import BTMiner
from .cgminer import CGMiner
from .epic import ePIC
from .goldshell import GoldshellMiner
from .hammer import BlackMiner
from .hiveon import Hiveon
from .iceriver import IceRiver
from .innosilicon import Innosilicon
from .luxminer import LUXMiner
from .marathon import MaraMiner
from .unknown import UnknownMiner
from .vnish import VNish
from .whatsminer import M2X, M3X, M5X, M6X

View File

@@ -18,6 +18,7 @@ from typing import List, Optional
from pyasic.config import MinerConfig
from pyasic.data import AlgoHashRate, Fan, HashBoard, HashUnit
from pyasic.data.pools import PoolMetrics, PoolUrl
from pyasic.errors import APIError
from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand
from pyasic.miners.device.firmware import StockFirmware
@@ -53,6 +54,10 @@ BMMINER_DATA_LOC = DataLocations(
"_get_uptime",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.POOLS): DataFunction(
"_get_pools",
[RPCAPICommand("rpc_pools", "pools")],
),
}
)
@@ -258,3 +263,33 @@ class BMMiner(StockFirmware):
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
except APIError:
pass
pools_data = []
if rpc_pools is not None:
try:
pools = rpc_pools.get("POOLS", [])
for pool_info in pools:
url = pool_info.get("URL")
pool_url = PoolUrl.from_str(url) if url else None
pool_data = PoolMetrics(
accepted=pool_info.get("Accepted"),
rejected=pool_info.get("Rejected"),
get_failures=pool_info.get("Get Failures"),
remote_failures=pool_info.get("Remote Failures"),
active=pool_info.get("Stratum Active"),
alive=pool_info.get("Status") == "Alive",
url=pool_url,
user=pool_info.get("User"),
index=pool_info.get("POOL"),
)
pools_data.append(pool_data)
except LookupError:
pass
return pools_data

View File

@@ -0,0 +1,508 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import List, Optional
from pyasic import MinerConfig
from pyasic.data import AlgoHashRate, Fan, HashBoard, HashUnit
from pyasic.data.error_codes import MinerErrorData, X19Error
from pyasic.data.pools import PoolMetrics, PoolUrl
from pyasic.errors import APIError
from pyasic.miners.data import (
DataFunction,
DataLocations,
DataOptions,
RPCAPICommand,
WebAPICommand,
)
from pyasic.miners.device.firmware import StockFirmware
from pyasic.rpc.ccminer import CCMinerRPCAPI
from pyasic.web.hammer import HammerWebAPI
HAMMER_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac",
[WebAPICommand("web_get_system_info", "get_system_info")],
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[RPCAPICommand("rpc_version", "version")],
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname",
[WebAPICommand("web_get_system_info", "get_system_info")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("rpc_summary", "summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.ERRORS): DataFunction(
"_get_errors",
[WebAPICommand("web_summary", "summary")],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
[WebAPICommand("web_get_blink_status", "get_blink_status")],
),
str(DataOptions.IS_MINING): DataFunction(
"_is_mining",
[WebAPICommand("web_get_conf", "get_miner_conf")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("rpc_stats", "stats")],
),
str(DataOptions.POOLS): DataFunction(
"_get_pools",
[RPCAPICommand("rpc_pools", "pools")],
),
}
)
class BlackMiner(StockFirmware):
"""Handler for Hammer miners."""
_rpc_cls = CCMinerRPCAPI
rpc: CCMinerRPCAPI
_web_cls = HammerWebAPI
web: HammerWebAPI
data_locations = HAMMER_DATA_LOC
async def get_config(self) -> MinerConfig:
data = await self.web.get_miner_conf()
if data:
self.config = MinerConfig.from_hammer(data)
return self.config
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
await self.web.set_miner_conf(config.as_hammer(user_suffix=user_suffix))
async def fault_light_on(self) -> bool:
data = await self.web.blink(blink=True)
if data:
if data.get("code") == "B000":
self.light = True
return self.light
async def fault_light_off(self) -> bool:
data = await self.web.blink(blink=False)
if data:
if data.get("code") == "B100":
self.light = False
return self.light
async def reboot(self) -> bool:
data = await self.web.reboot()
if data:
return True
return False
async def _get_api_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
rpc_version = await self.rpc.version()
except APIError:
pass
if rpc_version is not None:
try:
self.api_ver = rpc_version["VERSION"][0]["API"]
except LookupError:
pass
return self.api_ver
async def _get_fw_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
rpc_version = await self.rpc.version()
except APIError:
pass
if rpc_version is not None:
try:
self.fw_ver = rpc_version["VERSION"][0]["CompileTime"]
except LookupError:
pass
return self.fw_ver
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[AlgoHashRate]:
# get hr from API
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
except APIError:
pass
if rpc_summary is not None:
try:
return AlgoHashRate.SHA256(
rpc_summary["SUMMARY"][0]["GHS 5s"], HashUnit.SHA256.GH
).into(self.algo.unit.default)
except (LookupError, ValueError, TypeError):
pass
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
hashboards = []
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
if rpc_stats is not None:
try:
board_offset = -1
boards = rpc_stats["STATS"]
if len(boards) > 1:
for board_num in range(1, 16, 5):
for _b_num in range(5):
b = boards[1].get(f"chain_acn{board_num + _b_num}")
if b and not b == 0 and board_offset == -1:
board_offset = board_num
if board_offset == -1:
board_offset = 1
real_slots = []
for i in range(board_offset, board_offset + 4):
try:
key = f"chain_acs{i}"
if boards[1].get(key, "") != "":
real_slots.append(i)
except LookupError:
pass
if len(real_slots) < 3:
real_slots = list(
range(board_offset, board_offset + self.expected_hashboards)
)
for i in real_slots:
hashboard = HashBoard(
slot=i - board_offset, expected_chips=self.expected_chips
)
chip_temp = boards[1].get(f"temp{i}")
if chip_temp:
hashboard.chip_temp = round(chip_temp)
temp = boards[1].get(f"temp2_{i}")
if temp:
hashboard.temp = round(temp)
hashrate = boards[1].get(f"chain_rate{i}")
if hashrate:
hashboard.hashrate = AlgoHashRate.SHA256(
float(hashrate), HashUnit.SHA256.GH
).into(self.algo.unit.default)
chips = boards[1].get(f"chain_acn{i}")
if chips:
hashboard.chips = chips
hashboard.missing = False
if (not chips) or (not chips > 0):
hashboard.missing = True
hashboards.append(hashboard)
except (LookupError, ValueError, TypeError):
pass
return hashboards
async def _get_fans(self, rpc_stats: dict = None) -> List[Fan]:
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
fans = [Fan() for _ in range(self.expected_fans)]
if rpc_stats is not None:
try:
fan_offset = -1
for fan_num in range(1, 8, 4):
for _f_num in range(4):
f = rpc_stats["STATS"][1].get(f"fan{fan_num + _f_num}", 0)
if f and not f == 0 and fan_offset == -1:
fan_offset = fan_num
if fan_offset == -1:
fan_offset = 1
for fan in range(self.expected_fans):
fans[fan].speed = rpc_stats["STATS"][1].get(
f"fan{fan_offset+fan}", 0
)
except LookupError:
pass
return fans
async def _get_expected_hashrate(
self, rpc_stats: dict = None
) -> Optional[AlgoHashRate]:
# X19 method, not sure compatibility
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
if rpc_stats is not None:
try:
expected_rate = rpc_stats["STATS"][1]["total_rateideal"]
try:
rate_unit = rpc_stats["STATS"][1]["rate_unit"]
except KeyError:
rate_unit = "GH"
return AlgoHashRate.SHA256(
expected_rate, HashUnit.SHA256.from_str(rate_unit)
).into(self.algo.unit.default)
except LookupError:
pass
async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
if rpc_stats is not None:
try:
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass
async def _get_hostname(self, web_get_system_info: dict = None) -> Optional[str]:
if web_get_system_info is None:
try:
web_get_system_info = await self.web.get_system_info()
except APIError:
pass
if web_get_system_info is not None:
try:
return web_get_system_info["hostname"]
except KeyError:
pass
async def _get_mac(self, web_get_system_info: dict = None) -> Optional[str]:
if web_get_system_info is None:
try:
web_get_system_info = await self.web.get_system_info()
except APIError:
pass
if web_get_system_info is not None:
try:
return web_get_system_info["macaddr"]
except KeyError:
pass
try:
data = await self.web.get_network_info()
if data:
return data["macaddr"]
except KeyError:
pass
async def _get_errors(self, web_summary: dict = None) -> List[MinerErrorData]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
errors = []
if web_summary is not None:
try:
for item in web_summary["SUMMARY"][0]["status"]:
try:
if not item["status"] == "s":
errors.append(X19Error(item["msg"]))
except KeyError:
continue
except LookupError:
pass
return errors
async def _get_fault_light(
self, web_get_blink_status: dict = None
) -> Optional[bool]:
if self.light:
return self.light
if web_get_blink_status is None:
try:
web_get_blink_status = await self.web.get_blink_status()
except APIError:
pass
if web_get_blink_status is not None:
try:
self.light = web_get_blink_status["blink"]
except KeyError:
pass
return self.light
async def _get_expected_hashrate(
self, rpc_stats: dict = None
) -> Optional[AlgoHashRate]:
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
if rpc_stats is not None:
try:
expected_rate = rpc_stats["STATS"][1]["total_rateideal"]
try:
rate_unit = rpc_stats["STATS"][1]["rate_unit"]
except KeyError:
rate_unit = "GH"
return AlgoHashRate.SHA256(
expected_rate, HashUnit.SHA256.from_str(rate_unit)
).into(self.algo.unit.default)
except LookupError:
pass
async def set_static_ip(
self,
ip: str,
dns: str,
gateway: str,
subnet_mask: str = "255.255.255.0",
hostname: str = None,
):
if not hostname:
hostname = await self.get_hostname()
await self.web.set_network_conf(
ip=ip,
dns=dns,
gateway=gateway,
subnet_mask=subnet_mask,
hostname=hostname,
protocol=2,
)
async def set_dhcp(self, hostname: str = None):
if not hostname:
hostname = await self.get_hostname()
await self.web.set_network_conf(
ip="", dns="", gateway="", subnet_mask="", hostname=hostname, protocol=1
)
async def set_hostname(self, hostname: str):
cfg = await self.web.get_network_info()
dns = cfg["conf_dnsservers"]
gateway = cfg["conf_gateway"]
ip = cfg["conf_ipaddress"]
subnet_mask = cfg["conf_netmask"]
protocol = 1 if cfg["conf_nettype"] == "DHCP" else 2
await self.web.set_network_conf(
ip=ip,
dns=dns,
gateway=gateway,
subnet_mask=subnet_mask,
hostname=hostname,
protocol=protocol,
)
async def _is_mining(self, web_get_conf: dict = None) -> Optional[bool]:
if web_get_conf is None:
try:
web_get_conf = await self.web.get_miner_conf()
except APIError:
pass
if web_get_conf is not None:
try:
if str(web_get_conf["bitmain-work-mode"]).isdigit():
return (
False if int(web_get_conf["bitmain-work-mode"]) == 1 else True
)
return False
except LookupError:
pass
async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
if rpc_stats is not None:
try:
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
except APIError:
pass
pools_data = []
if rpc_pools is not None:
try:
pools = rpc_pools.get("POOLS", [])
for pool_info in pools:
url = pool_info.get("URL")
pool_url = PoolUrl.from_str(url) if url else None
pool_data = PoolMetrics(
accepted=pool_info.get("Accepted"),
rejected=pool_info.get("Rejected"),
get_failures=pool_info.get("Get Failures"),
remote_failures=pool_info.get("Remote Failures"),
active=pool_info.get("Stratum Active"),
alive=pool_info.get("Status") == "Alive",
url=pool_url,
user=pool_info.get("User"),
index=pool_info.get("POOL"),
)
pools_data.append(pool_data)
except LookupError:
pass
return pools_data

View File

@@ -80,6 +80,10 @@ VNISH_DATA_LOC = DataLocations(
"_is_mining",
[WebAPICommand("web_summary", "summary")],
),
str(DataOptions.POOLS): DataFunction(
"_get_pools",
[RPCAPICommand("rpc_pools", "pools")],
),
}
)

View File

@@ -0,0 +1,6 @@
from pyasic.miners.backends.bitaxe import BitAxe
from pyasic.miners.device.models.bitaxe import Gamma
class BitAxeGamma(BitAxe, Gamma):
pass

View File

@@ -1,3 +1,4 @@
from .BM1366 import BitAxeUltra
from .BM1368 import BitAxeSupra
from .BM1370 import BitAxeGamma
from .BM1397 import BitAxeMax

View File

@@ -52,3 +52,7 @@ class BitAxeMake(BaseMiner):
class IceRiverMake(BaseMiner):
make = MinerMake.ICERIVER
class HammerMake(BaseMiner):
make = MinerMake.HAMMER

View File

@@ -19,6 +19,7 @@ from .auradine import *
from .avalonminer import *
from .epic import *
from .goldshell import *
from .hammer import *
from .iceriver import *
from .innosilicon import *
from .whatsminer import *

View File

@@ -0,0 +1,10 @@
from pyasic.device.models import MinerModel
from pyasic.miners.device.makes import BitAxeMake
class Gamma(BitAxeMake):
raw_model = MinerModel.BITAXE.BM1370
expected_hashboards = 1
expected_chips = 1
expected_fans = 1

View File

@@ -1,3 +1,4 @@
from .BM1366 import Ultra
from .BM1368 import Supra
from .BM1370 import Gamma
from .BM1397 import Max

View File

@@ -0,0 +1,23 @@
# ------------------------------------------------------------------------------
# Copyright 2024 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.device.models import MinerModel
from pyasic.miners.device.makes import HammerMake
class D10(HammerMake):
raw_model = MinerModel.HAMMER.D10
expected_chips = 108

View File

@@ -0,0 +1 @@
from .D10 import D10

View File

@@ -0,0 +1 @@
from .DX import *

View File

@@ -32,13 +32,12 @@ from pyasic.miners.antminer import *
from pyasic.miners.auradine import *
from pyasic.miners.avalonminer import *
from pyasic.miners.backends import *
from pyasic.miners.backends.bitaxe import BitAxe
from pyasic.miners.backends.unknown import UnknownMiner
from pyasic.miners.base import AnyMiner
from pyasic.miners.bitaxe import *
from pyasic.miners.blockminer import *
from pyasic.miners.device.makes import *
from pyasic.miners.goldshell import *
from pyasic.miners.hammer import *
from pyasic.miners.iceriver import *
from pyasic.miners.innosilicon import *
from pyasic.miners.whatsminer import *
@@ -59,6 +58,7 @@ class MinerTypes(enum.Enum):
MARATHON = 11
BITAXE = 12
ICERIVER = 13
HAMMER = 14
MINER_CLASSES = {
@@ -465,6 +465,7 @@ MINER_CLASSES = {
"BM1368": BitAxeSupra,
"BM1366": BitAxeUltra,
"BM1397": BitAxeMax,
"BM1370": BitAxeGamma,
},
MinerTypes.ICERIVER: {
None: type("IceRiverUnknown", (IceRiver, IceRiverMake), {}),
@@ -478,6 +479,10 @@ MINER_CLASSES = {
"KS5L": IceRiverKS5L,
"KS5M": IceRiverKS5M,
},
MinerTypes.HAMMER: {
None: type("HammerUnknown", (BlackMiner, HammerMake), {}),
"HAMMER D10": HammerD10,
},
}
@@ -556,6 +561,7 @@ class MinerFactory:
MinerTypes.MARATHON: self.get_miner_model_marathon,
MinerTypes.BITAXE: self.get_miner_model_bitaxe,
MinerTypes.ICERIVER: self.get_miner_model_iceriver,
MinerTypes.HAMMER: self.get_miner_model_hammer,
}
fn = miner_model_fns.get(miner_type)
@@ -627,6 +633,10 @@ class MinerFactory:
"www-authenticate", ""
):
return MinerTypes.ANTMINER
if web_resp.status_code == 401 and 'realm="blackMiner' in web_resp.headers.get(
"www-authenticate", ""
):
return MinerTypes.HAMMER
if len(web_resp.history) > 0:
history_resp = web_resp.history[0]
if (
@@ -1130,6 +1140,21 @@ class MinerFactory:
except httpx.HTTPError:
pass
async def get_miner_model_hammer(self, ip: str) -> str | None:
auth = httpx.DigestAuth(
"root", settings.get("default_hammer_web_password", "root")
)
web_json_data = await self.send_web_command(
ip, "/cgi-bin/get_system_info.cgi", auth=auth
)
try:
miner_model = web_json_data["minertype"]
return miner_model
except (TypeError, LookupError):
pass
miner_factory = MinerFactory()

View File

@@ -0,0 +1 @@
from .blackminer import *

View File

@@ -0,0 +1,6 @@
from pyasic.miners.backends import BlackMiner
from pyasic.miners.device.models import D10
class HammerD10(BlackMiner, D10):
pass

View File

@@ -0,0 +1 @@
from .D10 import HammerD10

View File

@@ -0,0 +1 @@
from .DX import *

View File

@@ -17,6 +17,7 @@ from .bfgminer import BFGMinerRPCAPI
from .bmminer import BMMinerRPCAPI
from .bosminer import BOSMinerRPCAPI
from .btminer import BTMinerRPCAPI
from .ccminer import CCMinerRPCAPI
from .cgminer import CGMinerRPCAPI
from .gcminer import GCMinerRPCAPI
from .luxminer import LUXMinerRPCAPI

34
pyasic/rpc/ccminer.py Normal file
View File

@@ -0,0 +1,34 @@
# ------------------------------------------------------------------------------
# Copyright 2024 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.rpc.bmminer import BMMinerRPCAPI
class CCMinerRPCAPI(BMMinerRPCAPI):
"""An abstraction of the CCMiner API.
Each method corresponds to an API command in CCMiner.
This class abstracts use of the CCMiner API, as well as the
methods for sending commands to it. The `self.send_command()`
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.port = 8359

View File

@@ -31,6 +31,7 @@ _settings = { # defaults
"default_whatsminer_rpc_password": "admin",
"default_innosilicon_web_password": "admin",
"default_antminer_web_password": "root",
"default_hammer_web_password": "root",
"default_bosminer_web_password": "root",
"default_vnish_web_password": "admin",
"default_goldshell_web_password": "123456789",

View File

@@ -19,6 +19,7 @@ from .base import BaseWebAPI
from .braiins_os import BOSerWebAPI, BOSMinerWebAPI
from .epic import ePICWebAPI
from .goldshell import GoldshellWebAPI
from .hammer import HammerWebAPI
from .iceriver import IceRiverWebAPI
from .innosilicon import InnosiliconWebAPI
from .vnish import VNishWebAPI

240
pyasic/web/hammer.py Normal file
View File

@@ -0,0 +1,240 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import json
from typing import Any
import httpx
from pyasic import settings
from pyasic.web.base import BaseWebAPI
class HammerWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
"""Initialize the modern Hammer API client with a specific IP address.
Args:
ip (str): IP address of the Hammer device.
"""
super().__init__(ip)
self.username = "root"
self.pwd = settings.get("default_hammer_web_password", "root")
async def send_command(
self,
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
privileged: bool = False,
**parameters: Any,
) -> dict:
"""Send a command to the Hammer device using HTTP digest authentication.
Args:
command (str | bytes): The CGI command to send.
ignore_errors (bool): If True, ignore any HTTP errors.
allow_warning (bool): If True, proceed with warnings.
privileged (bool): If set to True, requires elevated privileges.
**parameters: Arbitrary keyword arguments to be sent as parameters in the request.
Returns:
dict: The JSON response from the device or an empty dictionary if an error occurs.
"""
url = f"http://{self.ip}:{self.port}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd)
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
if parameters:
data = await client.post(
url,
auth=auth,
timeout=settings.get("api_function_timeout", 3),
json=parameters,
)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError as e:
return {"success": False, "message": f"HTTP error occurred: {str(e)}"}
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
return {"success": False, "message": "Failed to decode JSON"}
return {"success": False, "message": "Unknown error occurred"}
async def multicommand(
self, *commands: str, ignore_errors: bool = False, allow_warning: bool = True
) -> dict:
"""Execute multiple commands simultaneously.
Args:
*commands (str): Multiple command strings to be executed.
ignore_errors (bool): If True, ignore any HTTP errors.
allow_warning (bool): If True, proceed with warnings.
Returns:
dict: A dictionary containing the results of all commands executed.
"""
async with httpx.AsyncClient(transport=settings.transport()) as client:
tasks = [
asyncio.create_task(self._handle_multicommand(client, command))
for command in commands
]
all_data = await asyncio.gather(*tasks)
data = {}
for item in all_data:
data.update(item)
data["multicommand"] = True
return data
async def _handle_multicommand(
self, client: httpx.AsyncClient, command: str
) -> dict:
"""Helper function for handling individual commands in a multicommand execution.
Args:
client (httpx.AsyncClient): The HTTP client to use for the request.
command (str): The command to be executed.
Returns:
dict: A dictionary containing the response of the executed command.
"""
auth = httpx.DigestAuth(self.username, self.pwd)
try:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
ret = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if ret.status_code == 200:
try:
json_data = ret.json()
return {command: json_data}
except json.decoder.JSONDecodeError:
pass
return {command: {}}
async def get_miner_conf(self) -> dict:
"""Retrieve the miner configuration from the Hammer device.
Returns:
dict: A dictionary containing the current configuration of the miner.
"""
return await self.send_command("get_miner_conf")
async def set_miner_conf(self, conf: dict) -> dict:
"""Set the configuration for the miner.
Args:
conf (dict): A dictionary of configuration settings to apply to the miner.
Returns:
dict: A dictionary response from the device after setting the configuration.
"""
return await self.send_command("set_miner_conf", **conf)
async def blink(self, blink: bool) -> dict:
"""Control the blinking of the LED on the miner device.
Args:
blink (bool): True to start blinking, False to stop.
Returns:
dict: A dictionary response from the device after the command execution.
"""
if blink:
return await self.send_command("blink", blink="true")
return await self.send_command("blink", blink="false")
async def reboot(self) -> dict:
"""Reboot the miner device.
Returns:
dict: A dictionary response from the device confirming the reboot command.
"""
return await self.send_command("reboot")
async def get_system_info(self) -> dict:
"""Retrieve system information from the miner.
Returns:
dict: A dictionary containing system information of the miner.
"""
return await self.send_command("get_system_info")
async def get_network_info(self) -> dict:
"""Retrieve network configuration information from the miner.
Returns:
dict: A dictionary containing the network configuration of the miner.
"""
return await self.send_command("get_network_info")
async def summary(self) -> dict:
"""Get a summary of the miner's status and performance.
Returns:
dict: A summary of the miner's current operational status.
"""
return await self.send_command("summary")
async def get_blink_status(self) -> dict:
"""Check the status of the LED blinking on the miner.
Returns:
dict: A dictionary indicating whether the LED is currently blinking.
"""
return await self.send_command("get_blink_status")
async def set_network_conf(
self,
ip: str,
dns: str,
gateway: str,
subnet_mask: str,
hostname: str,
protocol: int,
) -> dict:
"""Set the network configuration of the miner.
Args:
ip (str): IP address of the device.
dns (str): DNS server IP address.
gateway (str): Gateway IP address.
subnet_mask (str): Network subnet mask.
hostname (str): Hostname of the device.
protocol (int): Network protocol used.
Returns:
dict: A dictionary response from the device after setting the network configuration.
"""
return await self.send_command(
"set_network_conf",
ipAddress=ip,
ipDns=dns,
ipGateway=gateway,
ipHost=hostname,
ipPro=protocol,
ipSub=subnet_mask,
)

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyasic"
version = "0.62.4"
version = "0.63.1"
description = "A simplified and standardized interface for Bitcoin ASICs."
authors = ["UpstreamData <brett@upstreamdata.ca>"]
repository = "https://github.com/UpstreamData/pyasic"