Compare commits

...

10 Commits

Author SHA1 Message Date
Brett Rowan
76a77b51e8 version: bump version number 2025-08-14 13:25:26 -06:00
Brett Rowan
b099ff45d2 bug: remove print statements 2025-08-14 13:25:05 -06:00
Brett Rowan
9bc3cc221a version: bump version number 2025-08-14 11:41:50 -06:00
Brett Rowan
6418c2e102 feature: add BTMinerV3 control support 2025-08-14 11:41:08 -06:00
Brett Rowan
aa9f3b2c45 feature: add BTMinerV3 data support 2025-08-14 11:41:08 -06:00
Ryan Heideman
bb1c98f061 Goldshell Byte Support (#366) 2025-08-12 21:53:28 -06:00
Brett Rowan
d984431fe5 version: bump version number 2025-08-08 11:46:46 -06:00
Tony Scelfo
f1e4feb91e add expected chips for BraiinsModels BM100 and BM101 2025-08-08 11:45:43 -06:00
Brett Rowan
90c8986900 version: bump version number 2025-07-23 11:56:21 -06:00
Brett Rowan
5457ae6cd5 feature: add support for avalon Q 2025-07-23 11:55:57 -06:00
35 changed files with 1673 additions and 179 deletions

1
.gitignore vendored
View File

@@ -7,3 +7,4 @@ pyvenv.cfg
bin/
lib/
.idea/
.vs/

View File

@@ -0,0 +1,16 @@
# pyasic
## Byte Models
## Byte (Stock)
- [x] Shutdowns
- [x] Power Modes
- [ ] Setpoints
- [ ] Presets
::: pyasic.miners.goldshell.bfgminer.Byte.Byte.GoldshellByte
handler: python
options:
show_root_heading: false
heading_level: 0

View File

@@ -565,6 +565,12 @@ details {
<li><a href="../avalonminer/A15X#avalon-1566-stock">Avalon 1566 (Stock)</a></li>
</ul>
</details>
<details>
<summary>Q Series:</summary>
<ul>
<li><a href="../avalonminer/Q#avalon-q-home-stock">Avalon Q Home (Stock)</a></li>
</ul>
</details>
</ul>
</details>
<details>
@@ -615,6 +621,12 @@ details {
<li><a href="../goldshell/XBox#kd-box-pro-stock">KD Box Pro (Stock)</a></li>
</ul>
</details>
<details>
<summary>Byte Series:</summary>
<ul>
<li><a href="../goldshell/Byte#byte-stock">Byte (Stock)</a></li>
</ul>
</details>
</ul>
</details>
<details>

View File

@@ -97,6 +97,7 @@ nav:
- Innosilicon T3X: "miners/innosilicon/T3X.md"
- Innosilicon A10X: "miners/innosilicon/A10X.md"
- Innosilicon A11X: "miners/innosilicon/A11X.md"
- Goldshell Byte: "miners/goldshell/Byte.md"
- Goldshell X5: "miners/goldshell/X5.md"
- Goldshell XMax: "miners/goldshell/XMax.md"
- Goldshell XBox: "miners/goldshell/XBox.md"

View File

@@ -247,6 +247,11 @@ class MinerConfig(BaseModel):
"""Constructs a MinerConfig object from web configuration for Goldshell miners."""
return cls(pools=PoolConfig.from_am_modern(web_conf))
@classmethod
def from_goldshell_byte(cls, web_conf: dict) -> "MinerConfig":
"""Constructs a MinerConfig object from web configuration for Goldshell Byte miners."""
return cls(pools=PoolConfig.from_goldshell_byte(web_conf))
@classmethod
def from_inno(cls, web_pools: list) -> "MinerConfig":
"""Constructs a MinerConfig object from web configuration for Innosilicon miners."""

View File

@@ -631,6 +631,16 @@ class PoolConfig(MinerConfigValue):
def from_goldshell(cls, web_pools: list) -> "PoolConfig":
return cls(groups=[PoolGroup.from_goldshell(web_pools)])
@classmethod
def from_goldshell_byte(cls, web_pools: list) -> "PoolConfig":
return cls(
groups=[
PoolGroup.from_goldshell(g["pools"])
for g in web_pools
if len(g["pools"]) > 0
]
)
@classmethod
def from_inno(cls, web_pools: list) -> "PoolConfig":
return cls(groups=[PoolGroup.from_inno(web_pools)])

View File

@@ -12,6 +12,7 @@ from .kheavyhash import KHeavyHashAlgo
from .scrypt import ScryptAlgo
from .sha256 import SHA256Algo
from .x11 import X11Algo
from .zksnark import ZkSnarkAlgo
class MinerAlgo:
@@ -26,3 +27,4 @@ class MinerAlgo:
ETHASH = EtHashAlgo
EQUIHASH = EquihashAlgo
BLOCKFLOW = BlockFlowAlgo
ZKSNARK = ZkSnarkAlgo

View File

@@ -10,6 +10,7 @@ from .kheavyhash import KHeavyHashHashRate
from .scrypt import ScryptHashRate
from .sha256 import SHA256HashRate
from .x11 import X11HashRate
from .zksnark import ZkSnarkHashRate
class AlgoHashRate:
@@ -24,3 +25,4 @@ class AlgoHashRate:
ETHASH = EtHashHashRate
EQUIHASH = EquihashHashRate
BLOCKFLOW = BlockFlowHashRate
ZKSNARK = ZkSnarkHashRate

View File

@@ -9,6 +9,7 @@ from .kheavyhash import KHeavyHashUnit
from .scrypt import ScryptUnit
from .sha256 import SHA256Unit
from .x11 import X11Unit
from .zksnark import ZkSnarkUnit
class HashUnit:
@@ -23,3 +24,4 @@ class HashUnit:
ETHASH = EtHashUnit
EQUIHASH = EquihashUnit
BLOCKFLOW = BlockFlowUnit
ZKSNARK = ZkSnarkUnit

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
from .base import AlgoHashRateUnitType
class ZkSnarkUnit(AlgoHashRateUnitType):
H = 1
KH = int(H) * 1000
MH = int(KH) * 1000
GH = int(MH) * 1000
TH = int(GH) * 1000
PH = int(TH) * 1000
EH = int(PH) * 1000
ZH = int(EH) * 1000
default = GH

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from typing_extensions import Self
from pyasic.device.algorithm.hashrate.base import AlgoHashRateType
from pyasic.device.algorithm.hashrate.unit.zksnark import ZkSnarkUnit
from .unit import HashUnit
class ZkSnarkHashRate(AlgoHashRateType):
rate: float
unit: ZkSnarkUnit = HashUnit.ZKSNARK.default
def into(self, other: ZkSnarkUnit) -> Self:
return self.__class__(
rate=self.rate / (other.value / self.unit.value), unit=other
)

View File

@@ -0,0 +1,12 @@
from __future__ import annotations
from .base import MinerAlgoType
from .hashrate import ZkSnarkHashRate
from .hashrate.unit import ZkSnarkUnit
class ZkSnarkAlgo(MinerAlgoType):
hashrate: type[ZkSnarkHashRate] = ZkSnarkHashRate
unit: type[ZkSnarkUnit] = ZkSnarkUnit
name = "zkSNARK"

View File

@@ -456,6 +456,7 @@ class AvalonminerModels(MinerModelType):
Avalon1566 = "Avalon 1566"
AvalonNano3 = "Avalon Nano 3"
AvalonNano3s = "Avalon Nano 3s"
AvalonQHome = "Avalon Q Home"
def __str__(self):
return self.value
@@ -478,6 +479,7 @@ class GoldshellModels(MinerModelType):
KDMax = "KD Max"
KDBoxII = "KD Box II"
KDBoxPro = "KD Box Pro"
Byte = "Byte"
def __str__(self):
return self.value

View File

@@ -0,0 +1,22 @@
# ------------------------------------------------------------------------------
# Copyright 2025 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners.backends import AvalonMiner
from pyasic.miners.device.models import AvalonQHome
class CGMinerAvalonQHome(AvalonMiner, AvalonQHome):
pass

View File

@@ -0,0 +1 @@
from .Q import CGMinerAvalonQHome

View File

@@ -22,3 +22,4 @@ from .A11X import *
from .A12X import *
from .A15X import *
from .nano import *
from .Q import *

View File

@@ -49,31 +49,31 @@ AVALON_NANO_DATA_LOC = DataLocations(
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
@@ -102,35 +102,35 @@ AVALON_NANO3S_DATA_LOC = DataLocations(
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
@@ -170,58 +170,58 @@ class CGMinerAvalonNano3s(AvalonMiner, AvalonNano3s):
data_locations = AVALON_NANO3S_DATA_LOC
async def _get_wattage(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
async def _get_wattage(self, rpc_estats: dict = None) -> Optional[int]:
if rpc_estats is None:
try:
rpc_stats = await self.rpc.stats()
rpc_estats = await self.rpc.estats()
except APIError:
pass
if rpc_stats is not None:
if rpc_estats is not None:
try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return int(parsed_stats["PS"][6])
unparsed_estats = rpc_estats["STATS"][0]["MM ID0"]
parsed_estats = self.parse_estats(unparsed_estats)
return int(parsed_estats["PS"][6])
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_hashrate(self, rpc_stats: dict = None) -> Optional[AlgoHashRate]:
if rpc_stats is None:
async def _get_hashrate(self, rpc_estats: dict = None) -> Optional[AlgoHashRate]:
if rpc_estats is None:
try:
rpc_stats = await self.rpc.stats()
rpc_estats = await self.rpc.estats()
except APIError:
pass
if rpc_stats is not None:
if rpc_estats is not None:
try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
unparsed_estats = rpc_estats["STATS"][0]["MM ID0"]
parsed_estats = self.parse_estats(unparsed_estats)
return self.algo.hashrate(
rate=float(parsed_stats["GHSspd"][0]), unit=self.algo.unit.GH
rate=float(parsed_estats["GHSspd"]), unit=self.algo.unit.GH
).into(self.algo.unit.default)
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
hashboards = await AvalonMiner._get_hashboards(self, rpc_stats)
async def _get_hashboards(self, rpc_estats: dict = None) -> List[HashBoard]:
hashboards = await AvalonMiner._get_hashboards(self, rpc_estats)
if rpc_stats is None:
if rpc_estats is None:
try:
rpc_stats = await self.rpc.stats()
rpc_estats = await self.rpc.estats()
except APIError:
pass
if rpc_stats is not None:
if rpc_estats is not None:
try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
unparsed_estats = rpc_estats["STATS"][0]["MM ID0"]
parsed_estats = self.parse_estats(unparsed_estats)
except (IndexError, KeyError, ValueError, TypeError):
return hashboards
for board in range(len(hashboards)):
try:
board_hr = parsed_stats["GHSspd"][board]
board_hr = parsed_estats["GHSspd"][board]
hashboards[board].hashrate = self.algo.hashrate(
rate=float(board_hr), unit=self.algo.unit.GH
).into(self.algo.unit.default)

View File

@@ -13,8 +13,9 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import copy
import re
import time
from typing import List, Optional
from pyasic.data import Fan, HashBoard
@@ -22,6 +23,7 @@ from pyasic.device.algorithm import AlgoHashRate
from pyasic.errors import APIError
from pyasic.miners.backends.cgminer import CGMiner
from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand
from pyasic.rpc.avalonminer import AvalonMinerRPCAPI
AVALON_DATA_LOC = DataLocations(
**{
@@ -43,31 +45,31 @@ AVALON_DATA_LOC = DataLocations(
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
[RPCAPICommand("rpc_stats", "stats")],
[RPCAPICommand("rpc_estats", "estats")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
@@ -84,6 +86,9 @@ AVALON_DATA_LOC = DataLocations(
class AvalonMiner(CGMiner):
"""Handler for Avalon Miners"""
_rpc_cls = AvalonMinerRPCAPI
rpc: AvalonMinerRPCAPI
data_locations = AVALON_DATA_LOC
async def fault_light_on(self) -> bool:
@@ -134,45 +139,94 @@ class AvalonMiner(CGMiner):
return False
return False
async def stop_mining(self) -> bool:
try:
# Shut off 5 seconds from now
timestamp = int(time.time()) + 5
data = await self.rpc.ascset(0, f"softoff", f"1:{timestamp}")
except APIError:
return False
if "success" in data["STATUS"][0]["Msg"]:
return True
return False
async def resume_mining(self) -> bool:
try:
# Shut off 5 seconds from now
timestamp = int(time.time()) + 5
data = await self.rpc.ascset(0, f"softon", f"1:{timestamp}")
except APIError:
return False
if "success" in data["STATUS"][0]["Msg"]:
return True
return False
@staticmethod
def parse_stats(stats):
_stats_items = re.findall(".+?\\[*?]", stats)
stats_items = []
stats_dict = {}
for item in _stats_items:
if ": " in item:
data = item.replace("]", "").split("[")
data_list = [i.split(": ") for i in data[1].strip().split(", ")]
data_dict = {}
try:
for key, val in [tuple(item) for item in data_list]:
data_dict[key] = val
except ValueError:
# --avalon args
for arg_item in data_list:
item_data = arg_item[0].split(" ")
for idx, val in enumerate(item_data):
if idx % 2 == 0 or idx == 0:
data_dict[val] = item_data[idx + 1]
def parse_estats(data):
# Deep copy to preserve original structure
new_data = copy.deepcopy(data)
raw_data = [data[0].strip(), data_dict]
def convert_value(val, key):
val = val.strip()
if key == "SYSTEMSTATU":
return val
if " " in val:
parts = val.split()
result = []
for part in parts:
if part.isdigit():
result.append(int(part))
else:
try:
result.append(float(part))
except ValueError:
result.append(part)
return result
else:
raw_data = [
value
for value in item.replace("[", " ")
.replace("]", " ")
.split(" ")[:-1]
if value != ""
]
if len(raw_data) == 1:
raw_data.append("")
if raw_data[0] == "":
raw_data = raw_data[1:]
if val.isdigit():
return int(val)
try:
return float(val)
except ValueError:
return val
stats_dict[raw_data[0]] = raw_data[1:]
stats_items.append(raw_data)
def parse_info_block(info_str):
pattern = re.compile(r"(\w+)\[([^\]]*)\]")
return {
key: convert_value(val, key) for key, val in pattern.findall(info_str)
}
return stats_dict
for stat in new_data.get("STATS", []):
keys_to_replace = {}
for key, value in stat.items():
if "MM" in key:
# Normalize key by removing suffix after colon
norm_key = key.split(":")[0]
mm_data = value
if not isinstance(mm_data, str):
continue
if mm_data.startswith("'STATS':"):
mm_data = mm_data[len("'STATS':") :]
keys_to_replace[norm_key] = parse_info_block(mm_data)
elif key == "HBinfo":
match = re.search(r"'(\w+)':\{(.+)\}", value)
if match:
hb_key = match.group(1)
hb_data = match.group(2)
keys_to_replace[key] = {hb_key: parse_info_block(hb_data)}
# Remove old keys and insert parsed versions
for k in list(stat.keys()):
if "MM" in k or k == "HBinfo":
del stat[k]
stat.update(keys_to_replace)
return new_data
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
@@ -211,7 +265,7 @@ class AvalonMiner(CGMiner):
except (KeyError, IndexError, ValueError, TypeError):
pass
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
async def _get_hashboards(self, rpc_estats: dict = None) -> List[HashBoard]:
if self.expected_hashboards is None:
return []
@@ -220,164 +274,202 @@ class AvalonMiner(CGMiner):
for i in range(self.expected_hashboards)
]
if rpc_stats is None:
if rpc_estats is None:
try:
rpc_stats = await self.rpc.stats()
rpc_estats = await self.rpc.estats()
except APIError:
pass
if rpc_stats is not None:
if rpc_estats is not None:
try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
parsed_estats = self.parse_estats(rpc_estats)
except (IndexError, KeyError, ValueError, TypeError):
return hashboards
for board in range(self.expected_hashboards):
try:
hashboards[board].chip_temp = int(parsed_stats["MTmax"][board])
board_hr = parsed_estats["STATS"][0]["MM ID0"]["MGHS"]
if isinstance(board_hr, list):
hashboards[board].hashrate = self.algo.hashrate(
rate=float(board_hr[board]), unit=self.algo.unit.GH
).into(self.algo.unit.default)
else:
hashboards[board].hashrate = self.algo.hashrate(
rate=float(board_hr), unit=self.algo.unit.GH
).into(self.algo.unit.default)
except LookupError:
pass
try:
board_hr = parsed_stats["MGHS"][board]
hashboards[board].hashrate = self.algo.hashrate(
rate=float(board_hr), unit=self.algo.unit.GH
).into(self.algo.unit.default)
hashboards[board].chip_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["MTmax"][board]
)
except LookupError:
pass
try:
hashboards[board].chip_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["Tmax"]
)
except LookupError:
pass
try:
hashboards[board].temp = int(parsed_stats["MTavg"][board])
hashboards[board].temp = int(
parsed_estats["STATS"][0]["MM ID0"]["MTmax"][board]
)
except LookupError:
pass
try:
hashboards[board].temp = int(
parsed_estats["STATS"][0]["MM ID0"]["Tavg"]
)
except LookupError:
pass
try:
chip_data = parsed_stats[f"PVT_T{board}"]
hashboards[board].inlet_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["MTavg"][board]
)
except LookupError:
try:
hashboards[board].inlet_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["HBITemp"]
)
except LookupError:
pass
try:
hashboards[board].outlet_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["MTmax"][board]
)
except LookupError:
try:
hashboards[board].outlet_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["HBOTemp"]
)
except LookupError:
pass
try:
chip_data = parsed_estats["STATS"][0]["MM ID0"][f"PVT_T{board}"]
hashboards[board].missing = False
if chip_data:
hashboards[board].chips = len(
[item for item in chip_data if not item == "0"]
)
except LookupError:
pass
try:
chip_data = parsed_estats["STATS"][0]["HBinfo"][f"HB{board}"][
f"PVT_T{board}"
]
hashboards[board].missing = False
if chip_data:
hashboards[board].chips = len(
[item for item in chip_data if not item == "0"]
)
except LookupError:
pass
return hashboards
async def _get_expected_hashrate(
self, rpc_stats: dict = None
self, rpc_estats: dict = None
) -> Optional[AlgoHashRate]:
if rpc_stats is None:
if rpc_estats is None:
try:
rpc_stats = await self.rpc.stats()
rpc_estats = await self.rpc.estats()
except APIError:
pass
if rpc_stats is not None:
if rpc_estats is not None:
try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
return self.algo.hashrate(
rate=float(parsed_stats["GHSmm"][0]), unit=self.algo.unit.GH
rate=float(parsed_estats["GHSmm"]), unit=self.algo.unit.GH
).into(self.algo.unit.default)
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_env_temp(self, rpc_stats: dict = None) -> Optional[float]:
if rpc_stats is None:
async def _get_env_temp(self, rpc_estats: dict = None) -> Optional[float]:
if rpc_estats is None:
try:
rpc_stats = await self.rpc.stats()
rpc_estats = await self.rpc.estats()
except APIError:
pass
if rpc_stats is not None:
if rpc_estats is not None:
try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return float(parsed_stats["Temp"][0])
parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
return float(parsed_estats["Temp"])
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_wattage_limit(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
async def _get_wattage_limit(self, rpc_estats: dict = None) -> Optional[int]:
if rpc_estats is None:
try:
rpc_stats = await self.rpc.stats()
rpc_estats = await self.rpc.estats()
except APIError:
pass
if rpc_stats is not None:
if rpc_estats is not None:
try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return int(parsed_stats["MPO"][0])
parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
return int(parsed_estats["MPO"])
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_wattage(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
async def _get_wattage(self, rpc_estats: dict = None) -> Optional[int]:
if rpc_estats is None:
try:
rpc_stats = await self.rpc.stats()
rpc_estats = await self.rpc.estats()
except APIError:
pass
if rpc_stats is not None:
if rpc_estats is not None:
try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return int(parsed_stats["WALLPOWER"][0])
parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
return int(parsed_estats["WALLPOWER"])
except (IndexError, KeyError, ValueError, TypeError):
pass
async def _get_fans(self, rpc_stats: dict = None) -> List[Fan]:
async def _get_fans(self, rpc_estats: dict = None) -> List[Fan]:
if self.expected_fans is None:
return []
if rpc_stats is None:
if rpc_estats is None:
try:
rpc_stats = await self.rpc.stats()
rpc_estats = await self.rpc.estats()
except APIError:
pass
fans_data = [Fan() for _ in range(self.expected_fans)]
if rpc_stats is not None:
if rpc_estats is not None:
try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
except LookupError:
return fans_data
for fan in range(self.expected_fans):
try:
fans_data[fan].speed = int(parsed_stats[f"Fan{fan + 1}"][0])
fans_data[fan].speed = int(parsed_estats[f"Fan{fan + 1}"])
except (IndexError, KeyError, ValueError, TypeError):
pass
return fans_data
async def _get_fault_light(self, rpc_stats: dict = None) -> Optional[bool]:
async def _get_fault_light(self, rpc_estats: dict = None) -> Optional[bool]:
if self.light:
return self.light
if rpc_stats is None:
if rpc_estats is None:
try:
rpc_stats = await self.rpc.stats()
rpc_estats = await self.rpc.estats()
except APIError:
pass
if rpc_stats is not None:
if rpc_estats is not None:
try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
led = int(parsed_stats["Led"][0])
parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
led = int(parsed_estats["Led"])
return True if led == 1 else False
except (IndexError, KeyError, ValueError, TypeError):
pass
try:
data = await self.rpc.ascset(0, "led", "1-255")
except APIError:
return False
try:
if data["STATUS"][0]["Msg"] == "ASC 0 set info: LED[1]":
return True
except LookupError:
pass
return False

View File

@@ -16,7 +16,6 @@
import logging
from pathlib import Path
from typing import List, Optional
import aiofiles
@@ -28,7 +27,7 @@ from pyasic.device.algorithm import AlgoHashRate
from pyasic.errors import APIError
from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand
from pyasic.miners.device.firmware import StockFirmware
from pyasic.rpc.btminer import BTMinerRPCAPI
from pyasic.rpc.btminer import BTMinerRPCAPI, BTMinerV3RPCAPI
BTMINER_DATA_LOC = DataLocations(
**{
@@ -294,7 +293,7 @@ class BTMiner(StockFirmware):
async def _get_mac(
self, rpc_summary: dict = None, rpc_get_miner_info: dict = None
) -> Optional[str]:
) -> str | None:
if rpc_get_miner_info is None:
try:
rpc_get_miner_info = await self.rpc.get_miner_info()
@@ -321,7 +320,7 @@ class BTMiner(StockFirmware):
except LookupError:
pass
async def _get_api_ver(self, rpc_get_version: dict = None) -> Optional[str]:
async def _get_api_ver(self, rpc_get_version: dict = None) -> str | None:
if rpc_get_version is None:
try:
rpc_get_version = await self.rpc.get_version()
@@ -346,7 +345,7 @@ class BTMiner(StockFirmware):
async def _get_fw_ver(
self, rpc_get_version: dict = None, rpc_summary: dict = None
) -> Optional[str]:
) -> str | None:
if rpc_get_version is None:
try:
rpc_get_version = await self.rpc.get_version()
@@ -379,7 +378,7 @@ class BTMiner(StockFirmware):
return self.fw_ver
async def _get_hostname(self, rpc_get_miner_info: dict = None) -> Optional[str]:
async def _get_hostname(self, rpc_get_miner_info: dict = None) -> str | None:
hostname = None
if rpc_get_miner_info is None:
try:
@@ -395,7 +394,7 @@ class BTMiner(StockFirmware):
return hostname
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[AlgoHashRate]:
async def _get_hashrate(self, rpc_summary: dict = None) -> AlgoHashRate | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -410,8 +409,9 @@ class BTMiner(StockFirmware):
).into(self.algo.unit.default)
except LookupError:
pass
return None
async def _get_hashboards(self, rpc_devs: dict = None) -> List[HashBoard]:
async def _get_hashboards(self, rpc_devs: dict = None) -> list[HashBoard]:
if self.expected_hashboards is None:
return []
@@ -450,7 +450,7 @@ class BTMiner(StockFirmware):
return hashboards
async def _get_env_temp(self, rpc_summary: dict = None) -> Optional[float]:
async def _get_env_temp(self, rpc_summary: dict = None) -> float | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -462,8 +462,9 @@ class BTMiner(StockFirmware):
return rpc_summary["SUMMARY"][0]["Env Temp"]
except LookupError:
pass
return None
async def _get_wattage(self, rpc_summary: dict = None) -> Optional[int]:
async def _get_wattage(self, rpc_summary: dict = None) -> int | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -476,8 +477,9 @@ class BTMiner(StockFirmware):
return wattage if not wattage == -1 else None
except LookupError:
pass
return None
async def _get_wattage_limit(self, rpc_summary: dict = None) -> Optional[int]:
async def _get_wattage_limit(self, rpc_summary: dict = None) -> int | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -489,10 +491,11 @@ class BTMiner(StockFirmware):
return rpc_summary["SUMMARY"][0]["Power Limit"]
except LookupError:
pass
return None
async def _get_fans(
self, rpc_summary: dict = None, rpc_get_psu: dict = None
) -> List[Fan]:
) -> list[Fan]:
if self.expected_fans is None:
return []
@@ -517,7 +520,7 @@ class BTMiner(StockFirmware):
async def _get_fan_psu(
self, rpc_summary: dict = None, rpc_get_psu: dict = None
) -> Optional[int]:
) -> int | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -541,10 +544,11 @@ class BTMiner(StockFirmware):
return int(rpc_get_psu["Msg"]["fan_speed"])
except (KeyError, TypeError):
pass
return None
async def _get_errors(
self, rpc_summary: dict = None, rpc_get_error_code: dict = None
) -> List[MinerErrorData]:
) -> list[MinerErrorData]:
errors = []
if rpc_get_error_code is None and rpc_summary is None:
try:
@@ -581,7 +585,7 @@ class BTMiner(StockFirmware):
async def _get_expected_hashrate(
self, rpc_summary: dict = None
) -> Optional[AlgoHashRate]:
) -> AlgoHashRate | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -598,8 +602,9 @@ class BTMiner(StockFirmware):
except LookupError:
pass
return None
async def _get_fault_light(self, rpc_get_miner_info: dict = None) -> Optional[bool]:
async def _get_fault_light(self, rpc_get_miner_info: dict = None) -> bool | None:
if rpc_get_miner_info is None:
try:
rpc_get_miner_info = await self.rpc.get_miner_info()
@@ -637,7 +642,7 @@ class BTMiner(StockFirmware):
async def set_hostname(self, hostname: str):
await self.rpc.set_hostname(hostname)
async def _is_mining(self, rpc_status: dict = None) -> Optional[bool]:
async def _is_mining(self, rpc_status: dict = None) -> bool | None:
if rpc_status is None:
try:
rpc_status = await self.rpc.status()
@@ -655,8 +660,9 @@ class BTMiner(StockFirmware):
return True if rpc_status["Msg"]["mineroff"] == "false" else False
except LookupError:
pass
return False
async def _get_uptime(self, rpc_summary: dict = None) -> Optional[int]:
async def _get_uptime(self, rpc_summary: dict = None) -> int | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -669,7 +675,7 @@ class BTMiner(StockFirmware):
except LookupError:
pass
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
async def _get_pools(self, rpc_pools: dict = None) -> list[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
@@ -742,3 +748,380 @@ class BTMiner(StockFirmware):
exc_info=True,
)
raise
BTMINERV3_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac", [RPCAPICommand("rpc_get_device_info", "get_device_info")]
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_version",
[RPCAPICommand("rpc_get_device_info", "get_device_info")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_firmware_version",
[RPCAPICommand("rpc_get_device_info", "get_device_info")],
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname", [RPCAPICommand("rpc_get_device_info", "get_device_info")]
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_light_flashing",
[RPCAPICommand("rpc_get_device_info", "get_device_info")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit",
[RPCAPICommand("rpc_get_device_info", "get_device_info")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.FAN_PSU): DataFunction(
"_get_psu_fans", [RPCAPICommand("rpc_get_device_info", "get_device_info")]
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[
RPCAPICommand("rpc_get_device_info", "get_device_info"),
RPCAPICommand(
"rpc_get_miner_status_edevs",
"get_miner_status_edevs",
),
],
),
str(DataOptions.POOLS): DataFunction(
"_get_pools",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
}
)
class BTMinerV3(StockFirmware):
_rpc_cls = BTMinerV3RPCAPI
rpc: BTMinerV3RPCAPI
data_locations = BTMINERV3_DATA_LOC
supports_shutdown = True
supports_autotuning = True
supports_power_modes = True
async def fault_light_off(self) -> bool:
try:
data = await self.rpc.set_system_led()
except APIError:
return False
if data:
if "code" in data.keys():
if data["code"] == 0:
self.light = False
return True
return False
async def fault_light_on(self) -> bool:
try:
data = await self.rpc.set_system_led(
leds=[
{
{"color": "red", "period": 60, "duration": 20, "start": 0},
}
],
)
except APIError:
return False
if data:
if "code" in data.keys():
if data["code"] == 0:
self.light = True
return True
return False
async def reboot(self) -> bool:
try:
data = await self.rpc.set_system_reboot()
except APIError:
return False
if data.get("msg"):
if data["msg"] == "ok":
return True
return False
async def restart_backend(self) -> bool:
try:
data = await self.rpc.set_miner_service("restart")
except APIError:
return False
if data.get("msg"):
if data["msg"] == "ok":
return True
return False
async def stop_mining(self) -> bool:
try:
data = await self.rpc.set_miner_service("stop")
except APIError:
return False
if data.get("msg"):
if data["msg"] == "ok":
return True
return False
async def resume_mining(self) -> bool:
try:
data = await self.rpc.set_miner_service("start")
except APIError:
return False
if data.get("msg"):
if data["msg"] == "ok":
return True
return False
async def set_power_limit(self, wattage: int) -> bool:
try:
await self.rpc.set_miner_power_limit(wattage)
except Exception as e:
logging.warning(f"{self} set_power_limit: {e}")
return False
else:
return True
async def _get_mac(self, rpc_get_device_info: dict = None) -> str | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
return rpc_get_device_info.get("msg", {}).get("network", {}).get("mac")
async def _get_api_version(self, rpc_get_device_info: dict = None) -> str | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
return rpc_get_device_info.get("msg", {}).get("system", {}).get("api")
async def _get_firmware_version(
self, rpc_get_device_info: dict = None
) -> str | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
return rpc_get_device_info.get("msg", {}).get("system", {}).get("fwversion")
async def _get_hostname(self, rpc_get_device_info: dict = None) -> str | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
return rpc_get_device_info.get("msg", {}).get("network", {}).get("hostname")
async def _get_light_flashing(
self, rpc_get_device_info: dict = None
) -> bool | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
val = rpc_get_device_info.get("msg", {}).get("system", {}).get("ledstatus")
if isinstance(val, str):
return val != "auto"
return None
async def _get_wattage_limit(
self, rpc_get_device_info: dict = None
) -> float | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
val = rpc_get_device_info.get("msg", {}).get("miner", {}).get("power-limit-set")
try:
return float(val)
except (ValueError, TypeError):
return None
async def _get_fans(self, rpc_get_miner_status_summary: dict = None) -> list[Fan]:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return []
fans = []
summary = rpc_get_miner_status_summary.get("msg", {}).get("summary", {})
for idx, direction in enumerate(["in", "out"]):
rpm = summary.get(f"fan-speed-{direction}")
if rpm is not None:
fans.append(Fan(speed=rpm))
return fans
async def _get_psu_fans(self, rpc_get_device_info: dict = None) -> list[Fan]:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return []
rpm = rpc_get_device_info.get("msg", {}).get("power", {}).get("fanspeed")
return [Fan(speed=rpm)] if rpm is not None else []
async def _get_hashboards(
self,
rpc_get_device_info: dict = None,
rpc_get_miner_status_edevs: dict = None,
) -> list[HashBoard]:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return []
if rpc_get_miner_status_edevs is None:
try:
rpc_get_miner_status_edevs = await self.rpc.get_miner_status_edevs()
except APIError:
return []
boards = []
board_count = (
rpc_get_device_info.get("msg", {}).get("hardware", {}).get("boards", 3)
)
edevs = rpc_get_miner_status_edevs.get("msg", {}).get("edevs", [])
for idx in range(board_count):
board_data = edevs[idx] if idx < len(edevs) else {}
boards.append(
HashBoard(
slot=idx,
hashrate=self.algo.hashrate(
rate=board_data.get("hash-average", 0), unit=self.algo.unit.TH
).into(self.algo.unit.default),
temp=board_data.get("chip-temp-min"),
inlet_temp=board_data.get("chip-temp-min"),
outlet_temp=board_data.get("chip-temp-max"),
serial_number=board_data.get(f"pcbsn{idx}"),
chips=board_data.get("effective-chips"),
expected_chips=self.expected_chips,
active=(board_data.get("hash-average") or 0) > 0,
missing=False,
tuned=True,
)
)
return boards
async def _get_pools(
self, rpc_get_miner_status_summary: dict = None
) -> list[PoolMetrics]:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return []
pools = []
msg_pools = rpc_get_miner_status_summary.get("msg", {}).get("pools", [])
for idx, pool in enumerate(msg_pools):
pools.append(
PoolMetrics(
index=idx,
user=pool.get("account"),
alive=pool.get("status") == "alive",
active=pool.get("stratum-active"),
url=pool.get("url"),
)
)
return pools
async def _get_uptime(
self, rpc_get_miner_status_summary: dict = None
) -> int | None:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return None
return (
rpc_get_miner_status_summary.get("msg", {})
.get("summary", {})
.get("elapsed")
)
async def _get_wattage(
self, rpc_get_miner_status_summary: dict = None
) -> float | None:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return None
return (
rpc_get_miner_status_summary.get("msg", {})
.get("summary", {})
.get("power-realtime")
)
async def _get_hashrate(
self, rpc_get_miner_status_summary: dict = None
) -> float | None:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return None
return (
rpc_get_miner_status_summary.get("msg", {})
.get("summary", {})
.get("hash-realtime")
)
async def _get_expected_hashrate(
self, rpc_get_miner_status_summary: dict = None
) -> float | None:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return None
return (
rpc_get_miner_status_summary.get("msg", {})
.get("summary", {})
.get("factory-hash")
)
async def _get_env_temp(
self, rpc_get_miner_status_summary: dict = None
) -> float | None:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return None
return (
rpc_get_miner_status_summary.get("msg", {})
.get("summary", {})
.get("environment-temperature")
)

View File

@@ -13,23 +13,23 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners.backends.btminer import BTMiner
from pyasic.miners.backends.btminer import BTMiner, BTMinerV3
class M7X(BTMiner):
supports_autotuning = True
class M7X(BTMinerV3):
pass
class M6X(BTMiner):
supports_autotuning = True
class M6X(BTMinerV3):
pass
class M5X(BTMiner):
supports_autotuning = True
class M5X(BTMinerV3):
pass
class M3X(BTMiner):
supports_autotuning = True
class M3X(BTMinerV3):
pass
class M2X(BTMiner):

View File

@@ -0,0 +1,27 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.device.algorithm import MinerAlgo
from pyasic.device.models import MinerModel
from pyasic.miners.device.makes import AvalonMinerMake
class AvalonQHome(AvalonMinerMake):
raw_model = MinerModel.AVALONMINER.AvalonQHome
expected_chips = 160
expected_fans = 2
expected_hashboards = 1
algo = MinerAlgo.SHA256

View File

@@ -0,0 +1,17 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from .Q import AvalonQHome

View File

@@ -22,3 +22,4 @@ from .A11X import *
from .A12X import *
from .A15X import *
from .nano import *
from .Q import *

View File

@@ -6,6 +6,7 @@ from pyasic.miners.device.makes import BraiinsMake
class BMM100(BraiinsMake):
raw_model = MinerModel.BRAIINS.BMM100
expected_chips = 1
expected_hashboards = 1
expected_fans = 1
algo = MinerAlgo.SHA256
@@ -14,6 +15,7 @@ class BMM100(BraiinsMake):
class BMM101(BraiinsMake):
raw_model = MinerModel.BRAIINS.BMM101
expected_chips = 1
expected_hashboards = 1
expected_fans = 1
algo = MinerAlgo.SHA256

View File

@@ -0,0 +1,27 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.device.algorithm.base import GenericAlgo
from pyasic.device.models import MinerModel
from pyasic.miners.device.makes import GoldshellMake
class Byte(GoldshellMake):
raw_model = MinerModel.GOLDSHELL.Byte
expected_chips = 0
expected_fans = 0
expected_hashboards = 0
algo = GenericAlgo

View File

@@ -0,0 +1,16 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from .Byte import Byte

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from .Byte import *
from .X5 import *
from .XBox import *
from .XMax import *

View File

@@ -512,6 +512,7 @@ MINER_CLASSES = {
"AVALONMINER NANO3": CGMinerAvalonNano3,
"AVALON NANO3S": CGMinerAvalonNano3s,
"AVALONMINER 15-194": CGMinerAvalon1566,
"AVALON Q": CGMinerAvalonQHome,
},
MinerTypes.INNOSILICON: {
None: type("InnosiliconUnknown", (Innosilicon, InnosiliconMake), {}),
@@ -528,6 +529,7 @@ MINER_CLASSES = {
"GOLDSHELL KDMAX": GoldshellKDMax,
"GOLDSHELL KDBOXII": GoldshellKDBoxII,
"GOLDSHELL KDBOXPRO": GoldshellKDBoxPro,
"GOLDSHELL BYTE": GoldshellByte,
},
MinerTypes.BRAIINS_OS: {
None: BOSMiner,
@@ -1064,6 +1066,45 @@ class MinerFactory:
return data
async def send_btminer_v3_api_command(self, ip, command):
try:
reader, writer = await asyncio.open_connection(ip, 4433)
except (ConnectionError, OSError):
return
cmd = {"cmd": command}
try:
# send the command
json_cmd = json.dumps(cmd).encode("utf-8")
length = len(json_cmd)
writer.write(length.to_bytes(4, byteorder="little"))
writer.write(json_cmd)
await writer.drain()
# receive all the data
resp_len = await reader.readexactly(4)
data = await reader.readexactly(
int.from_bytes(resp_len, byteorder="little")
)
writer.close()
await writer.wait_closed()
except asyncio.CancelledError:
writer.close()
await writer.wait_closed()
return
except (ConnectionError, OSError):
return
if data == b"Socket connect failed: Connection refused\n":
return
try:
data = json.loads(data)
except json.JSONDecodeError:
return {}
return data
@staticmethod
async def _fix_api_data(data: bytes) -> str:
if data.endswith(b"\x00"):
@@ -1183,10 +1224,18 @@ class MinerFactory:
try:
miner_model = sock_json_data["DEVDETAILS"][0]["Model"].replace("_", "")
miner_model = miner_model[:-1] + "0"
return miner_model
except (TypeError, LookupError):
pass
sock_json_data_v3 = await self.send_btminer_v3_api_command(
ip, "get.device.info"
)
try:
miner_model = sock_json_data_v3["msg"]["miner"]["type"].replace("_", "")
miner_model = miner_model[:-1] + "0"
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_avalonminer(self, ip: str) -> str | None:
sock_json_data = await self.send_api_command(ip, "version")

View File

@@ -0,0 +1,319 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import List, Optional, Union
from pyasic.config import MinerConfig
from pyasic.data import Fan, MinerData
from pyasic.data.boards import HashBoard
from pyasic.data.pools import PoolMetrics, PoolUrl
from pyasic.device.algorithm import AlgoHashRate, MinerAlgo
from pyasic.errors import APIError
from pyasic.miners.backends import GoldshellMiner
from pyasic.miners.data import (
DataFunction,
DataLocations,
DataOptions,
RPCAPICommand,
WebAPICommand,
)
from pyasic.miners.device.models import Byte
ALGORITHM_SCRYPT_NAME = "scrypt(LTC)"
ALGORITHM_ZKSNARK_NAME = "zkSNARK(ALEO)"
EXPECTED_CHIPS_PER_SCRYPT_BOARD = 5
EXPECTED_CHIPS_PER_ZKSNARK_BOARD = 3
GOLDSHELL_BYTE_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac",
[WebAPICommand("web_setting", "setting")],
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[WebAPICommand("web_setting", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[WebAPICommand("web_status", "status")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("rpc_devs", "devs")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("rpc_devs", "devs")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[
RPCAPICommand("rpc_devs", "devs"),
RPCAPICommand("rpc_devdetails", "devdetails"),
],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("rpc_devs", "devs")],
),
str(DataOptions.POOLS): DataFunction(
"_get_pools",
[RPCAPICommand("rpc_pools", "pools")],
),
}
)
class GoldshellByte(GoldshellMiner, Byte):
data_locations = GOLDSHELL_BYTE_DATA_LOC
cgdev: dict | None = None
async def get_data(
self,
allow_warning: bool = False,
include: List[Union[str, DataOptions]] = None,
exclude: List[Union[str, DataOptions]] = None,
) -> MinerData:
if self.cgdev is None:
try:
self.cgdev = await self.web.send_command("cgminer?cgminercmd=devs")
except APIError:
pass
scrypt_board_count = 0
zksnark_board_count = 0
total_wattage = 0
total_uptime_mins = 0
for minfo in self.cgdev.get("minfos", []):
algo_name = minfo.get("name")
for info in minfo.get("infos", []):
self.expected_hashboards += 1
self.expected_fans += 1
total_wattage = int(float(info.get("power", 0)))
total_uptime_mins = int(info.get("time", 0))
if algo_name == ALGORITHM_SCRYPT_NAME:
scrypt_board_count += 1
elif algo_name == ALGORITHM_ZKSNARK_NAME:
zksnark_board_count += 1
self.expected_chips = (EXPECTED_CHIPS_PER_SCRYPT_BOARD * scrypt_board_count) + (
EXPECTED_CHIPS_PER_ZKSNARK_BOARD * zksnark_board_count
)
if scrypt_board_count > 0 and zksnark_board_count == 0:
self.algo = MinerAlgo.SCRYPT
elif zksnark_board_count > 0 and scrypt_board_count == 0:
self.algo = MinerAlgo.ZKSNARK
data = await super().get_data(allow_warning, include, exclude)
data.expected_chips = self.expected_chips
data.wattage = total_wattage
data.uptime = total_uptime_mins
data.voltage = 0
for board in data.hashboards:
data.voltage += board.voltage
return data
async def get_config(self) -> MinerConfig:
try:
pools = await self.web.pools()
except APIError:
return self.config
self.config = MinerConfig.from_goldshell_byte(pools)
return self.config
async def _get_api_ver(self, web_setting: dict = None) -> Optional[str]:
if web_setting is None:
try:
web_setting = await self.web.setting()
except APIError:
pass
if web_setting is not None:
try:
version = web_setting.get("version")
if version is not None:
self.api_ver = version.strip("v")
return self.api_ver
except KeyError:
pass
return self.api_ver
async def _get_expected_hashrate(
self, rpc_devs: dict = None
) -> Optional[AlgoHashRate]:
if rpc_devs is None:
try:
rpc_devs = await self.rpc.devs()
except APIError:
pass
total_hash_rate_mh = 0
if rpc_devs is not None:
for board in rpc_devs.get("DEVS", []):
algo_name = board.get("pool")
if algo_name == ALGORITHM_SCRYPT_NAME:
total_hash_rate_mh += (
self.algo.hashrate(
rate=float(board.get("estimate_hash_rate", 0)),
unit=self.algo.unit.H,
)
.into(self.algo.unit.MH)
.rate
)
elif algo_name == ALGORITHM_ZKSNARK_NAME:
total_hash_rate_mh += float(board.get("theory_hash", 0))
hash_rate = self.algo.hashrate(
rate=total_hash_rate_mh, unit=self.algo.unit.MH
).into(self.algo.unit.default)
return hash_rate
async def _get_hashrate(self, rpc_devs: dict = None) -> Optional[AlgoHashRate]:
if rpc_devs is None:
try:
rpc_devs = await self.rpc.devs()
except APIError:
pass
total_hash_rate_mh = 0
if rpc_devs is not None:
for board in rpc_devs.get("DEVS", []):
total_hash_rate_mh += float(board.get("MHS 20s", 0))
hash_rate = self.algo.hashrate(
rate=total_hash_rate_mh, unit=self.algo.unit.MH
).into(self.algo.unit.default)
return hash_rate
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
except APIError:
pass
pools_data = []
if rpc_pools is not None:
try:
pools = rpc_pools.get("POOLS", [])
for index, pool_info in enumerate(pools):
url = pool_info.get("URL")
pool_url = PoolUrl.from_str(url) if url else None
pool_data = PoolMetrics(
accepted=pool_info.get("Accepted"),
rejected=pool_info.get("Rejected"),
active=pool_info.get("Stratum Active"),
alive=pool_info.get("Status") == "Alive",
url=pool_url,
user=pool_info.get("User"),
index=index,
)
pools_data.append(pool_data)
except LookupError:
pass
return pools_data
async def _get_hashboards(
self, rpc_devs: dict = None, rpc_devdetails: dict = None
) -> List[HashBoard]:
if rpc_devs is None:
try:
rpc_devs = await self.rpc.devs()
except APIError:
pass
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if rpc_devs is not None:
for board in rpc_devs.get("DEVS", []):
b_id = board["PGA"]
hashboards[b_id].hashrate = self.algo.hashrate(
rate=float(board["MHS 20s"]), unit=self.algo.unit.MH
).into(self.algo.unit.default)
hashboards[b_id].chip_temp = board["tstemp-1"]
hashboards[b_id].temp = board["tstemp-2"]
hashboards[b_id].voltage = board["voltage"]
hashboards[b_id].active = board["Status"] == "Alive"
hashboards[b_id].missing = False
algo_name = board.get("pool")
if algo_name == ALGORITHM_SCRYPT_NAME:
hashboards[b_id].expected_chips = EXPECTED_CHIPS_PER_SCRYPT_BOARD
elif algo_name == ALGORITHM_ZKSNARK_NAME:
hashboards[b_id].expected_chips = EXPECTED_CHIPS_PER_ZKSNARK_BOARD
if rpc_devdetails is None:
try:
rpc_devdetails = await self.rpc.devdetails()
except APIError:
pass
if rpc_devdetails is not None:
for board in rpc_devdetails.get("DEVS", []):
b_id = board["DEVDETAILS"]
hashboards[b_id].chips = board["chips-nr"]
return hashboards
async def _get_fans(self, rpc_devs: dict = None) -> List[Fan]:
if self.expected_fans is None:
return []
if rpc_devs is None:
try:
rpc_devs = await self.rpc.devs()
except APIError:
pass
fans_data = []
if rpc_devs is not None:
for board in rpc_devs.get("DEVS", []):
if board.get("PGA") is not None:
try:
b_id = board["PGA"]
fan_speed = board[f"fan{b_id}"]
fans_data.append(fan_speed)
except KeyError:
pass
fans = [Fan(speed=d) if d else Fan() for d in fans_data]
return fans

View File

@@ -0,0 +1,16 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from .Byte import GoldshellByte

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from .Byte import *
from .X5 import *
from .XBox import *
from .XMax import *

27
pyasic/rpc/avalonminer.py Normal file
View File

@@ -0,0 +1,27 @@
# ------------------------------------------------------------------------------
# Copyright 2025 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.rpc.cgminer import CGMinerRPCAPI
class AvalonMinerRPCAPI(CGMinerRPCAPI):
"""An abstraction of the AvalonMiner API.
Each method corresponds to an API command in AvalonMiner.
"""
async def litestats(self):
return await self.send_command("litestats")

View File

@@ -23,7 +23,8 @@ import json
import logging
import re
import struct
from typing import Literal, Union
from asyncio import Future, StreamReader, StreamWriter
from typing import Any, AsyncGenerator, Callable, Literal, Union
import httpx
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
@@ -1100,3 +1101,277 @@ class BTMinerRPCAPI(BaseMinerRPCAPI):
</details>
"""
return await self.send_command("get_error_code", allow_warning=False)
class BTMinerV3RPCAPI(BaseMinerRPCAPI):
def __init__(self, ip: str, port: int = 4433, api_ver: str = "0.0.0"):
super().__init__(ip, port, api_ver=api_ver)
self.reader: StreamReader | None = None
self.writer: StreamWriter | None = None
self.reader_loop = None
self.salt = None
self.cmd_results = {}
self.cmd_callbacks = {"get.miner.report": set()}
async def connect(self):
self.reader, self.writer = await asyncio.open_connection(
str(self.ip), self.port
)
self.reader_loop = asyncio.create_task(self._read_loop())
async def disconnect(self):
self.writer.close()
await self.writer.wait_closed()
self.reader_loop.cancel()
async def send_command(
self, command: str, parameters: Any = None, **kwargs
) -> dict:
if self.writer is None:
await self.connect()
while command in self.cmd_results:
wait_fut = self.cmd_results[command]
await wait_fut
result_fut = Future()
self.cmd_results[command] = result_fut
cmd = {"cmd": command}
if parameters is not None:
cmd["param"] = parameters
if command.startswith("set."):
salt = await self.get_salt()
ts = int(datetime.datetime.now().timestamp())
cmd["ts"] = ts
token_str = cmd["cmd"] + self.pwd + salt + str(ts)
token_hashed = bytearray(
base64.b64encode(hashlib.sha256(token_str.encode("utf-8")).digest())
)
token_hashed[8] = 0
cmd["account"] = "super"
cmd["token"] = token_hashed.decode("ascii")
# send the command
ser = json.dumps(cmd).encode("utf-8")
header = struct.pack("<I", len(ser))
await self._send_bytes(header + json.dumps(cmd).encode("utf-8"))
await result_fut
return result_fut.result()
async def _read_loop(self):
while True:
result = await self._read_bytes()
data = self._load_api_data(result)
command = data["desc"]
if command in self.cmd_callbacks:
callbacks: list[Callable] = self.cmd_callbacks[command]
await asyncio.gather(*[callback(data) for callback in callbacks])
elif command in self.cmd_results:
future: Future = self.cmd_results.pop(command)
future.set_result(data)
else:
logging.error(f"Received unexpected data for {self}: {data}")
async def _read_bytes(self, **kwargs) -> bytes:
header = await self.reader.readexactly(4)
length = struct.unpack("<I", header)[0]
return await self.reader.readexactly(length)
async def _send_bytes(self, data: bytes, **kwargs):
self.writer.write(data)
await self.writer.drain()
async def get_salt(self) -> str:
if self.salt is not None:
return self.salt
data = await self.send_command("get.device.info", "salt")
self.salt = data["msg"]["salt"]
return self.salt
async def get_miner_report(self) -> AsyncGenerator[dict]:
if self.writer is None:
await self.connect()
result = asyncio.Queue()
async def callback(data: dict):
await result.put(data)
cb_fn = callback
try:
self.cmd_callbacks["get.miner.report"].add(cb_fn)
while True:
yield await result.get()
if self.writer.is_closing():
break
finally:
self.cmd_callbacks["get.miner.report"].remove(cb_fn)
async def get_system_setting(self) -> dict | None:
return await self.send_command("get.system.setting")
async def get_miner_status_summary(self) -> dict | None:
return await self.send_command("get.miner.status", parameters="summary")
async def get_miner_status_edevs(self) -> dict | None:
return await self.send_command("get.miner.status", parameters="edevs")
async def get_miner_history(self) -> dict | None:
data = await self.send_command(
"get.miner.history",
parameters={
"start": "1",
"stop": str(datetime.datetime.now().timestamp()),
},
)
ret = {}
result = data.get("msg")
if result is not None:
unparsed = result["Data"].strip()
for item in unparsed.split(" "):
list_item = item.split(",")
timestamp = int(list_item.pop(0))
ret[timestamp] = list_item
return ret
async def get_psu_command(self):
return await self.send_command("get.psu.command")
async def get_miner_setting(self) -> dict | None:
return await self.send_command("get.miner.setting")
async def get_device_info(self) -> dict | None:
return await self.send_command("get.device.info")
async def get_log_download(self) -> dict | None:
return await self.send_command("get.log.download")
async def get_fan_setting(self) -> dict | None:
return await self.send_command("get.fan.setting")
async def set_system_reboot(self) -> dict | None:
return await self.send_command("set.system.reboot")
async def set_system_factory_reset(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.factory_reset")
async def set_system_update_firmware(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.update_firmware")
async def set_system_net_config(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.net_config")
async def set_system_led(self, leds: list | None = None) -> dict | None:
if leds is None:
return await self.send_command("set.system.led", parameters="auto")
else:
return await self.send_command("set.system.led", parameters=leds)
async def set_system_time_randomized(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.time_randomized")
async def set_system_timezone(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.timezone")
async def set_system_hostname(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.hostname")
async def set_system_webpools(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.webpools")
async def set_miner_target_freq(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.target_freq")
async def set_miner_heat_mode(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.heat_mode")
async def set_system_ntp_server(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.ntp_server")
async def set_miner_service(self, value: str) -> dict | None:
return await self.send_command("set.miner.service", parameters=value)
async def set_miner_power_mode(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.power_mode")
async def set_miner_cointype(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.cointype")
async def set_miner_pools(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.pools")
async def set_miner_fastboot(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.fastboot")
async def set_miner_power_percent(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.power_percent")
async def set_miner_pre_power_on(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.pre_power_on")
async def set_miner_restore_setting(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.restore_setting")
async def set_miner_report(self, frequency: int = 1) -> dict | None:
return await self.send_command(
"set.miner.report", parameters={"gap": frequency}
)
async def set_miner_power_limit(self, power: int) -> dict | None:
return await self.send_command("set.miner.power_limit", parameters=power)
async def set_miner_upfreq_speed(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.upfreq_speed")
async def set_log_upload(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.log.upload")
async def set_user_change_passwd(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.user.change_passwd")
async def set_user_permission(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.user.permission")
async def set_fan_temp_offset(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.fan.temp_offset")
async def set_fan_poweroff_cool(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.fan.poweroff_cool")
async def set_fan_zero_speed(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.fan.zero_speed")
async def set_shell_debug(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.shell.debug")

View File

@@ -1,6 +1,6 @@
[project]
name = "pyasic"
version = "0.73.0"
version = "0.75.1"
description = "A simplified and standardized interface for Bitcoin ASICs."
authors = [{name = "UpstreamData", email = "brett@upstreamdata.ca"}]

File diff suppressed because one or more lines are too long