feature: add support for avalon Q

This commit is contained in:
Brett Rowan
2025-07-23 11:55:57 -06:00
parent aa3d105fcb
commit 5457ae6cd5
13 changed files with 461 additions and 147 deletions

View File

@@ -565,6 +565,12 @@ details {
<li><a href="../avalonminer/A15X#avalon-1566-stock">Avalon 1566 (Stock)</a></li> <li><a href="../avalonminer/A15X#avalon-1566-stock">Avalon 1566 (Stock)</a></li>
</ul> </ul>
</details> </details>
<details>
<summary>Q Series:</summary>
<ul>
<li><a href="../avalonminer/Q#avalon-q-home-stock">Avalon Q Home (Stock)</a></li>
</ul>
</details>
</ul> </ul>
</details> </details>
<details> <details>

View File

@@ -456,6 +456,7 @@ class AvalonminerModels(MinerModelType):
Avalon1566 = "Avalon 1566" Avalon1566 = "Avalon 1566"
AvalonNano3 = "Avalon Nano 3" AvalonNano3 = "Avalon Nano 3"
AvalonNano3s = "Avalon Nano 3s" AvalonNano3s = "Avalon Nano 3s"
AvalonQHome = "Avalon Q Home"
def __str__(self): def __str__(self):
return self.value return self.value

View File

@@ -0,0 +1,22 @@
# ------------------------------------------------------------------------------
# Copyright 2025 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners.backends import AvalonMiner
from pyasic.miners.device.models import AvalonQHome
class CGMinerAvalonQHome(AvalonMiner, AvalonQHome):
pass

View File

@@ -0,0 +1 @@
from .Q import CGMinerAvalonQHome

View File

@@ -22,3 +22,4 @@ from .A11X import *
from .A12X import * from .A12X import *
from .A15X import * from .A15X import *
from .nano import * from .nano import *
from .Q import *

View File

@@ -49,31 +49,31 @@ AVALON_NANO_DATA_LOC = DataLocations(
), ),
str(DataOptions.EXPECTED_HASHRATE): DataFunction( str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate", "_get_expected_hashrate",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.HASHBOARDS): DataFunction( str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards", "_get_hashboards",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction( str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp", "_get_env_temp",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.WATTAGE_LIMIT): DataFunction( str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit", "_get_wattage_limit",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.WATTAGE): DataFunction( str(DataOptions.WATTAGE): DataFunction(
"_get_wattage", "_get_wattage",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.FANS): DataFunction( str(DataOptions.FANS): DataFunction(
"_get_fans", "_get_fans",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.FAULT_LIGHT): DataFunction( str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light", "_get_fault_light",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.UPTIME): DataFunction( str(DataOptions.UPTIME): DataFunction(
"_get_uptime", "_get_uptime",
@@ -102,35 +102,35 @@ AVALON_NANO3S_DATA_LOC = DataLocations(
), ),
str(DataOptions.HASHRATE): DataFunction( str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate", "_get_hashrate",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.EXPECTED_HASHRATE): DataFunction( str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate", "_get_expected_hashrate",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.HASHBOARDS): DataFunction( str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards", "_get_hashboards",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction( str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp", "_get_env_temp",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.WATTAGE_LIMIT): DataFunction( str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit", "_get_wattage_limit",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.WATTAGE): DataFunction( str(DataOptions.WATTAGE): DataFunction(
"_get_wattage", "_get_wattage",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.FANS): DataFunction( str(DataOptions.FANS): DataFunction(
"_get_fans", "_get_fans",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.FAULT_LIGHT): DataFunction( str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light", "_get_fault_light",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.UPTIME): DataFunction( str(DataOptions.UPTIME): DataFunction(
"_get_uptime", "_get_uptime",
@@ -170,58 +170,58 @@ class CGMinerAvalonNano3s(AvalonMiner, AvalonNano3s):
data_locations = AVALON_NANO3S_DATA_LOC data_locations = AVALON_NANO3S_DATA_LOC
async def _get_wattage(self, rpc_stats: dict = None) -> Optional[int]: async def _get_wattage(self, rpc_estats: dict = None) -> Optional[int]:
if rpc_stats is None: if rpc_estats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_estats = await self.rpc.estats()
except APIError: except APIError:
pass pass
if rpc_stats is not None: if rpc_estats is not None:
try: try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"] unparsed_estats = rpc_estats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_estats = self.parse_estats(unparsed_estats)
return int(parsed_stats["PS"][6]) return int(parsed_estats["PS"][6])
except (IndexError, KeyError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_hashrate(self, rpc_stats: dict = None) -> Optional[AlgoHashRate]: async def _get_hashrate(self, rpc_estats: dict = None) -> Optional[AlgoHashRate]:
if rpc_stats is None: if rpc_estats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_estats = await self.rpc.estats()
except APIError: except APIError:
pass pass
if rpc_stats is not None: if rpc_estats is not None:
try: try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"] unparsed_estats = rpc_estats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_estats = self.parse_estats(unparsed_estats)
return self.algo.hashrate( return self.algo.hashrate(
rate=float(parsed_stats["GHSspd"][0]), unit=self.algo.unit.GH rate=float(parsed_estats["GHSspd"]), unit=self.algo.unit.GH
).into(self.algo.unit.default) ).into(self.algo.unit.default)
except (IndexError, KeyError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]: async def _get_hashboards(self, rpc_estats: dict = None) -> List[HashBoard]:
hashboards = await AvalonMiner._get_hashboards(self, rpc_stats) hashboards = await AvalonMiner._get_hashboards(self, rpc_estats)
if rpc_stats is None: if rpc_estats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_estats = await self.rpc.estats()
except APIError: except APIError:
pass pass
if rpc_stats is not None: if rpc_estats is not None:
try: try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"] unparsed_estats = rpc_estats["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) parsed_estats = self.parse_estats(unparsed_estats)
except (IndexError, KeyError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
return hashboards return hashboards
for board in range(len(hashboards)): for board in range(len(hashboards)):
try: try:
board_hr = parsed_stats["GHSspd"][board] board_hr = parsed_estats["GHSspd"][board]
hashboards[board].hashrate = self.algo.hashrate( hashboards[board].hashrate = self.algo.hashrate(
rate=float(board_hr), unit=self.algo.unit.GH rate=float(board_hr), unit=self.algo.unit.GH
).into(self.algo.unit.default) ).into(self.algo.unit.default)

View File

@@ -13,8 +13,9 @@
# See the License for the specific language governing permissions and - # See the License for the specific language governing permissions and -
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
import copy
import re import re
import time
from typing import List, Optional from typing import List, Optional
from pyasic.data import Fan, HashBoard from pyasic.data import Fan, HashBoard
@@ -22,6 +23,7 @@ from pyasic.device.algorithm import AlgoHashRate
from pyasic.errors import APIError from pyasic.errors import APIError
from pyasic.miners.backends.cgminer import CGMiner from pyasic.miners.backends.cgminer import CGMiner
from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand
from pyasic.rpc.avalonminer import AvalonMinerRPCAPI
AVALON_DATA_LOC = DataLocations( AVALON_DATA_LOC = DataLocations(
**{ **{
@@ -43,31 +45,31 @@ AVALON_DATA_LOC = DataLocations(
), ),
str(DataOptions.EXPECTED_HASHRATE): DataFunction( str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate", "_get_expected_hashrate",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.HASHBOARDS): DataFunction( str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards", "_get_hashboards",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction( str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp", "_get_env_temp",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.WATTAGE_LIMIT): DataFunction( str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit", "_get_wattage_limit",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.WATTAGE): DataFunction( str(DataOptions.WATTAGE): DataFunction(
"_get_wattage", "_get_wattage",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.FANS): DataFunction( str(DataOptions.FANS): DataFunction(
"_get_fans", "_get_fans",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.FAULT_LIGHT): DataFunction( str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light", "_get_fault_light",
[RPCAPICommand("rpc_stats", "stats")], [RPCAPICommand("rpc_estats", "estats")],
), ),
str(DataOptions.UPTIME): DataFunction( str(DataOptions.UPTIME): DataFunction(
"_get_uptime", "_get_uptime",
@@ -84,6 +86,9 @@ AVALON_DATA_LOC = DataLocations(
class AvalonMiner(CGMiner): class AvalonMiner(CGMiner):
"""Handler for Avalon Miners""" """Handler for Avalon Miners"""
_rpc_cls = AvalonMinerRPCAPI
rpc: AvalonMinerRPCAPI
data_locations = AVALON_DATA_LOC data_locations = AVALON_DATA_LOC
async def fault_light_on(self) -> bool: async def fault_light_on(self) -> bool:
@@ -134,45 +139,94 @@ class AvalonMiner(CGMiner):
return False return False
return False return False
async def stop_mining(self) -> bool:
try:
# Shut off 5 seconds from now
timestamp = int(time.time()) + 5
data = await self.rpc.ascset(0, f"softoff", f"1:{timestamp}")
except APIError:
return False
if "success" in data["STATUS"][0]["Msg"]:
return True
return False
async def resume_mining(self) -> bool:
try:
# Shut off 5 seconds from now
timestamp = int(time.time()) + 5
data = await self.rpc.ascset(0, f"softon", f"1:{timestamp}")
except APIError:
return False
if "success" in data["STATUS"][0]["Msg"]:
return True
return False
@staticmethod @staticmethod
def parse_stats(stats): def parse_estats(data):
_stats_items = re.findall(".+?\\[*?]", stats) # Deep copy to preserve original structure
stats_items = [] new_data = copy.deepcopy(data)
stats_dict = {}
for item in _stats_items:
if ": " in item:
data = item.replace("]", "").split("[")
data_list = [i.split(": ") for i in data[1].strip().split(", ")]
data_dict = {}
try:
for key, val in [tuple(item) for item in data_list]:
data_dict[key] = val
except ValueError:
# --avalon args
for arg_item in data_list:
item_data = arg_item[0].split(" ")
for idx, val in enumerate(item_data):
if idx % 2 == 0 or idx == 0:
data_dict[val] = item_data[idx + 1]
raw_data = [data[0].strip(), data_dict] def convert_value(val, key):
val = val.strip()
if key == "SYSTEMSTATU":
return val
if " " in val:
parts = val.split()
result = []
for part in parts:
if part.isdigit():
result.append(int(part))
else:
try:
result.append(float(part))
except ValueError:
result.append(part)
return result
else: else:
raw_data = [ if val.isdigit():
value return int(val)
for value in item.replace("[", " ") try:
.replace("]", " ") return float(val)
.split(" ")[:-1] except ValueError:
if value != "" return val
]
if len(raw_data) == 1:
raw_data.append("")
if raw_data[0] == "":
raw_data = raw_data[1:]
stats_dict[raw_data[0]] = raw_data[1:] def parse_info_block(info_str):
stats_items.append(raw_data) pattern = re.compile(r"(\w+)\[([^\]]*)\]")
return {
key: convert_value(val, key) for key, val in pattern.findall(info_str)
}
return stats_dict for stat in new_data.get("STATS", []):
keys_to_replace = {}
for key, value in stat.items():
if "MM" in key:
# Normalize key by removing suffix after colon
norm_key = key.split(":")[0]
mm_data = value
if not isinstance(mm_data, str):
continue
if mm_data.startswith("'STATS':"):
mm_data = mm_data[len("'STATS':") :]
keys_to_replace[norm_key] = parse_info_block(mm_data)
elif key == "HBinfo":
match = re.search(r"'(\w+)':\{(.+)\}", value)
if match:
hb_key = match.group(1)
hb_data = match.group(2)
keys_to_replace[key] = {hb_key: parse_info_block(hb_data)}
# Remove old keys and insert parsed versions
for k in list(stat.keys()):
if "MM" in k or k == "HBinfo":
del stat[k]
stat.update(keys_to_replace)
return new_data
################################################## ##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ### ### DATA GATHERING FUNCTIONS (get_{some_data}) ###
@@ -211,7 +265,7 @@ class AvalonMiner(CGMiner):
except (KeyError, IndexError, ValueError, TypeError): except (KeyError, IndexError, ValueError, TypeError):
pass pass
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]: async def _get_hashboards(self, rpc_estats: dict = None) -> List[HashBoard]:
if self.expected_hashboards is None: if self.expected_hashboards is None:
return [] return []
@@ -220,164 +274,202 @@ class AvalonMiner(CGMiner):
for i in range(self.expected_hashboards) for i in range(self.expected_hashboards)
] ]
if rpc_stats is None: if rpc_estats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_estats = await self.rpc.estats()
except APIError: except APIError:
pass pass
if rpc_stats is not None: if rpc_estats is not None:
try: try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"] parsed_estats = self.parse_estats(rpc_estats)
parsed_stats = self.parse_stats(unparsed_stats)
except (IndexError, KeyError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
return hashboards return hashboards
for board in range(self.expected_hashboards): for board in range(self.expected_hashboards):
try: try:
hashboards[board].chip_temp = int(parsed_stats["MTmax"][board]) board_hr = parsed_estats["STATS"][0]["MM ID0"]["MGHS"]
if isinstance(board_hr, list):
hashboards[board].hashrate = self.algo.hashrate(
rate=float(board_hr[board]), unit=self.algo.unit.GH
).into(self.algo.unit.default)
else:
hashboards[board].hashrate = self.algo.hashrate(
rate=float(board_hr), unit=self.algo.unit.GH
).into(self.algo.unit.default)
except LookupError: except LookupError:
pass pass
try: try:
board_hr = parsed_stats["MGHS"][board] hashboards[board].chip_temp = int(
hashboards[board].hashrate = self.algo.hashrate( parsed_estats["STATS"][0]["MM ID0"]["MTmax"][board]
rate=float(board_hr), unit=self.algo.unit.GH )
).into(self.algo.unit.default)
except LookupError: except LookupError:
pass try:
hashboards[board].chip_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["Tmax"]
)
except LookupError:
pass
try: try:
hashboards[board].temp = int(parsed_stats["MTavg"][board]) hashboards[board].temp = int(
parsed_estats["STATS"][0]["MM ID0"]["MTmax"][board]
)
except LookupError: except LookupError:
pass try:
hashboards[board].temp = int(
parsed_estats["STATS"][0]["MM ID0"]["Tavg"]
)
except LookupError:
pass
try: try:
chip_data = parsed_stats[f"PVT_T{board}"] hashboards[board].inlet_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["MTavg"][board]
)
except LookupError:
try:
hashboards[board].inlet_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["HBITemp"]
)
except LookupError:
pass
try:
hashboards[board].outlet_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["MTmax"][board]
)
except LookupError:
try:
hashboards[board].outlet_temp = int(
parsed_estats["STATS"][0]["MM ID0"]["HBOTemp"]
)
except LookupError:
pass
try:
chip_data = parsed_estats["STATS"][0]["MM ID0"][f"PVT_T{board}"]
hashboards[board].missing = False hashboards[board].missing = False
if chip_data: if chip_data:
hashboards[board].chips = len( hashboards[board].chips = len(
[item for item in chip_data if not item == "0"] [item for item in chip_data if not item == "0"]
) )
except LookupError: except LookupError:
pass try:
chip_data = parsed_estats["STATS"][0]["HBinfo"][f"HB{board}"][
f"PVT_T{board}"
]
hashboards[board].missing = False
if chip_data:
hashboards[board].chips = len(
[item for item in chip_data if not item == "0"]
)
except LookupError:
pass
return hashboards return hashboards
async def _get_expected_hashrate( async def _get_expected_hashrate(
self, rpc_stats: dict = None self, rpc_estats: dict = None
) -> Optional[AlgoHashRate]: ) -> Optional[AlgoHashRate]:
if rpc_stats is None: if rpc_estats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_estats = await self.rpc.estats()
except APIError: except APIError:
pass pass
if rpc_stats is not None: if rpc_estats is not None:
try: try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"] parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
return self.algo.hashrate( return self.algo.hashrate(
rate=float(parsed_stats["GHSmm"][0]), unit=self.algo.unit.GH rate=float(parsed_estats["GHSmm"]), unit=self.algo.unit.GH
).into(self.algo.unit.default) ).into(self.algo.unit.default)
except (IndexError, KeyError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_env_temp(self, rpc_stats: dict = None) -> Optional[float]: async def _get_env_temp(self, rpc_estats: dict = None) -> Optional[float]:
if rpc_stats is None: if rpc_estats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_estats = await self.rpc.estats()
except APIError: except APIError:
pass pass
if rpc_stats is not None: if rpc_estats is not None:
try: try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"] parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) return float(parsed_estats["Temp"])
return float(parsed_stats["Temp"][0])
except (IndexError, KeyError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_wattage_limit(self, rpc_stats: dict = None) -> Optional[int]: async def _get_wattage_limit(self, rpc_estats: dict = None) -> Optional[int]:
if rpc_stats is None: if rpc_estats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_estats = await self.rpc.estats()
except APIError: except APIError:
pass pass
if rpc_stats is not None: if rpc_estats is not None:
try: try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"] parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) return int(parsed_estats["MPO"])
return int(parsed_stats["MPO"][0])
except (IndexError, KeyError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_wattage(self, rpc_stats: dict = None) -> Optional[int]: async def _get_wattage(self, rpc_estats: dict = None) -> Optional[int]:
if rpc_stats is None: if rpc_estats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_estats = await self.rpc.estats()
except APIError: except APIError:
pass pass
if rpc_stats is not None: if rpc_estats is not None:
try: try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"] parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) return int(parsed_estats["WALLPOWER"])
return int(parsed_stats["WALLPOWER"][0])
except (IndexError, KeyError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
async def _get_fans(self, rpc_stats: dict = None) -> List[Fan]: async def _get_fans(self, rpc_estats: dict = None) -> List[Fan]:
if self.expected_fans is None: if self.expected_fans is None:
return [] return []
if rpc_stats is None: if rpc_estats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_estats = await self.rpc.estats()
except APIError: except APIError:
pass pass
fans_data = [Fan() for _ in range(self.expected_fans)] fans_data = [Fan() for _ in range(self.expected_fans)]
if rpc_stats is not None: if rpc_estats is not None:
try: try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"] parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats)
except LookupError: except LookupError:
return fans_data return fans_data
for fan in range(self.expected_fans): for fan in range(self.expected_fans):
try: try:
fans_data[fan].speed = int(parsed_stats[f"Fan{fan + 1}"][0]) fans_data[fan].speed = int(parsed_estats[f"Fan{fan + 1}"])
except (IndexError, KeyError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
return fans_data return fans_data
async def _get_fault_light(self, rpc_stats: dict = None) -> Optional[bool]: async def _get_fault_light(self, rpc_estats: dict = None) -> Optional[bool]:
if self.light: if self.light:
return self.light return self.light
if rpc_stats is None: if rpc_estats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_estats = await self.rpc.estats()
except APIError: except APIError:
pass pass
if rpc_stats is not None: if rpc_estats is not None:
try: try:
unparsed_stats = rpc_stats["STATS"][0]["MM ID0"] parsed_estats = self.parse_estats(rpc_estats)["STATS"][0]["MM ID0"]
parsed_stats = self.parse_stats(unparsed_stats) led = int(parsed_estats["Led"])
led = int(parsed_stats["Led"][0])
return True if led == 1 else False return True if led == 1 else False
except (IndexError, KeyError, ValueError, TypeError): except (IndexError, KeyError, ValueError, TypeError):
pass pass
try:
data = await self.rpc.ascset(0, "led", "1-255")
except APIError:
return False
try:
if data["STATUS"][0]["Msg"] == "ASC 0 set info: LED[1]":
return True
except LookupError:
pass
return False return False

View File

@@ -0,0 +1,27 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.device.algorithm import MinerAlgo
from pyasic.device.models import MinerModel
from pyasic.miners.device.makes import AvalonMinerMake
class AvalonQHome(AvalonMinerMake):
raw_model = MinerModel.AVALONMINER.AvalonQHome
expected_chips = 160
expected_fans = 2
expected_hashboards = 1
algo = MinerAlgo.SHA256

View File

@@ -0,0 +1,17 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from .Q import AvalonQHome

View File

@@ -22,3 +22,4 @@ from .A11X import *
from .A12X import * from .A12X import *
from .A15X import * from .A15X import *
from .nano import * from .nano import *
from .Q import *

View File

@@ -512,6 +512,7 @@ MINER_CLASSES = {
"AVALONMINER NANO3": CGMinerAvalonNano3, "AVALONMINER NANO3": CGMinerAvalonNano3,
"AVALON NANO3S": CGMinerAvalonNano3s, "AVALON NANO3S": CGMinerAvalonNano3s,
"AVALONMINER 15-194": CGMinerAvalon1566, "AVALONMINER 15-194": CGMinerAvalon1566,
"AVALON Q": CGMinerAvalonQHome,
}, },
MinerTypes.INNOSILICON: { MinerTypes.INNOSILICON: {
None: type("InnosiliconUnknown", (Innosilicon, InnosiliconMake), {}), None: type("InnosiliconUnknown", (Innosilicon, InnosiliconMake), {}),

27
pyasic/rpc/avalonminer.py Normal file
View File

@@ -0,0 +1,27 @@
# ------------------------------------------------------------------------------
# Copyright 2025 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.rpc.cgminer import CGMinerRPCAPI
class AvalonMinerRPCAPI(CGMinerRPCAPI):
"""An abstraction of the AvalonMiner API.
Each method corresponds to an API command in AvalonMiner.
"""
async def litestats(self):
return await self.send_command("litestats")

File diff suppressed because one or more lines are too long