feature: add BTMinerV3 data support

This commit is contained in:
Brett Rowan
2025-08-14 11:28:44 -06:00
parent bb1c98f061
commit aa9f3b2c45
4 changed files with 659 additions and 31 deletions

View File

@@ -16,7 +16,6 @@
import logging
from pathlib import Path
from typing import List, Optional
import aiofiles
@@ -28,7 +27,7 @@ from pyasic.device.algorithm import AlgoHashRate
from pyasic.errors import APIError
from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand
from pyasic.miners.device.firmware import StockFirmware
from pyasic.rpc.btminer import BTMinerRPCAPI
from pyasic.rpc.btminer import BTMinerRPCAPI, BTMinerV3RPCAPI
BTMINER_DATA_LOC = DataLocations(
**{
@@ -294,7 +293,7 @@ class BTMiner(StockFirmware):
async def _get_mac(
self, rpc_summary: dict = None, rpc_get_miner_info: dict = None
) -> Optional[str]:
) -> str | None:
if rpc_get_miner_info is None:
try:
rpc_get_miner_info = await self.rpc.get_miner_info()
@@ -321,7 +320,7 @@ class BTMiner(StockFirmware):
except LookupError:
pass
async def _get_api_ver(self, rpc_get_version: dict = None) -> Optional[str]:
async def _get_api_ver(self, rpc_get_version: dict = None) -> str | None:
if rpc_get_version is None:
try:
rpc_get_version = await self.rpc.get_version()
@@ -346,7 +345,7 @@ class BTMiner(StockFirmware):
async def _get_fw_ver(
self, rpc_get_version: dict = None, rpc_summary: dict = None
) -> Optional[str]:
) -> str | None:
if rpc_get_version is None:
try:
rpc_get_version = await self.rpc.get_version()
@@ -379,7 +378,7 @@ class BTMiner(StockFirmware):
return self.fw_ver
async def _get_hostname(self, rpc_get_miner_info: dict = None) -> Optional[str]:
async def _get_hostname(self, rpc_get_miner_info: dict = None) -> str | None:
hostname = None
if rpc_get_miner_info is None:
try:
@@ -395,7 +394,7 @@ class BTMiner(StockFirmware):
return hostname
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[AlgoHashRate]:
async def _get_hashrate(self, rpc_summary: dict = None) -> AlgoHashRate | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -410,8 +409,9 @@ class BTMiner(StockFirmware):
).into(self.algo.unit.default)
except LookupError:
pass
return None
async def _get_hashboards(self, rpc_devs: dict = None) -> List[HashBoard]:
async def _get_hashboards(self, rpc_devs: dict = None) -> list[HashBoard]:
if self.expected_hashboards is None:
return []
@@ -450,7 +450,7 @@ class BTMiner(StockFirmware):
return hashboards
async def _get_env_temp(self, rpc_summary: dict = None) -> Optional[float]:
async def _get_env_temp(self, rpc_summary: dict = None) -> float | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -462,8 +462,9 @@ class BTMiner(StockFirmware):
return rpc_summary["SUMMARY"][0]["Env Temp"]
except LookupError:
pass
return None
async def _get_wattage(self, rpc_summary: dict = None) -> Optional[int]:
async def _get_wattage(self, rpc_summary: dict = None) -> int | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -476,8 +477,9 @@ class BTMiner(StockFirmware):
return wattage if not wattage == -1 else None
except LookupError:
pass
return None
async def _get_wattage_limit(self, rpc_summary: dict = None) -> Optional[int]:
async def _get_wattage_limit(self, rpc_summary: dict = None) -> int | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -489,10 +491,11 @@ class BTMiner(StockFirmware):
return rpc_summary["SUMMARY"][0]["Power Limit"]
except LookupError:
pass
return None
async def _get_fans(
self, rpc_summary: dict = None, rpc_get_psu: dict = None
) -> List[Fan]:
) -> list[Fan]:
if self.expected_fans is None:
return []
@@ -517,7 +520,7 @@ class BTMiner(StockFirmware):
async def _get_fan_psu(
self, rpc_summary: dict = None, rpc_get_psu: dict = None
) -> Optional[int]:
) -> int | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -541,10 +544,11 @@ class BTMiner(StockFirmware):
return int(rpc_get_psu["Msg"]["fan_speed"])
except (KeyError, TypeError):
pass
return None
async def _get_errors(
self, rpc_summary: dict = None, rpc_get_error_code: dict = None
) -> List[MinerErrorData]:
) -> list[MinerErrorData]:
errors = []
if rpc_get_error_code is None and rpc_summary is None:
try:
@@ -581,7 +585,7 @@ class BTMiner(StockFirmware):
async def _get_expected_hashrate(
self, rpc_summary: dict = None
) -> Optional[AlgoHashRate]:
) -> AlgoHashRate | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -598,8 +602,9 @@ class BTMiner(StockFirmware):
except LookupError:
pass
return None
async def _get_fault_light(self, rpc_get_miner_info: dict = None) -> Optional[bool]:
async def _get_fault_light(self, rpc_get_miner_info: dict = None) -> bool | None:
if rpc_get_miner_info is None:
try:
rpc_get_miner_info = await self.rpc.get_miner_info()
@@ -637,7 +642,7 @@ class BTMiner(StockFirmware):
async def set_hostname(self, hostname: str):
await self.rpc.set_hostname(hostname)
async def _is_mining(self, rpc_status: dict = None) -> Optional[bool]:
async def _is_mining(self, rpc_status: dict = None) -> bool | None:
if rpc_status is None:
try:
rpc_status = await self.rpc.status()
@@ -655,8 +660,9 @@ class BTMiner(StockFirmware):
return True if rpc_status["Msg"]["mineroff"] == "false" else False
except LookupError:
pass
return False
async def _get_uptime(self, rpc_summary: dict = None) -> Optional[int]:
async def _get_uptime(self, rpc_summary: dict = None) -> int | None:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
@@ -669,7 +675,7 @@ class BTMiner(StockFirmware):
except LookupError:
pass
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
async def _get_pools(self, rpc_pools: dict = None) -> list[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
@@ -742,3 +748,303 @@ class BTMiner(StockFirmware):
exc_info=True,
)
raise
BTMINERV3_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac", [RPCAPICommand("rpc_get_device_info", "get_device_info")]
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_version",
[RPCAPICommand("rpc_get_device_info", "get_device_info")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_firmware_version",
[RPCAPICommand("rpc_get_device_info", "get_device_info")],
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname", [RPCAPICommand("rpc_get_device_info", "get_device_info")]
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_light_flashing",
[RPCAPICommand("rpc_get_device_info", "get_device_info")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit",
[RPCAPICommand("rpc_get_device_info", "get_device_info")],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.FAN_PSU): DataFunction(
"_get_psu_fans", [RPCAPICommand("rpc_get_device_info", "get_device_info")]
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[
RPCAPICommand("rpc_get_device_info", "get_device_info"),
RPCAPICommand(
"rpc_get_miner_status_edevs",
"get_miner_status_edevs",
),
],
),
str(DataOptions.POOLS): DataFunction(
"_get_pools",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.UPTIME): DataFunction(
"_get_uptime",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
str(DataOptions.ENVIRONMENT_TEMP): DataFunction(
"_get_env_temp",
[RPCAPICommand("rpc_get_miner_status_summary", "get_miner_status_summary")],
),
}
)
class BTMinerV3(StockFirmware):
_rpc_cls = BTMinerV3RPCAPI
rpc: BTMinerV3RPCAPI
data_locations = BTMINERV3_DATA_LOC
supports_shutdown = True
supports_autotuning = True
supports_power_modes = True
async def _get_mac(self, rpc_get_device_info: dict = None) -> str | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
return rpc_get_device_info.get("msg", {}).get("network", {}).get("mac")
async def _get_api_version(self, rpc_get_device_info: dict = None) -> str | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
return rpc_get_device_info.get("msg", {}).get("system", {}).get("api")
async def _get_firmware_version(
self, rpc_get_device_info: dict = None
) -> str | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
return rpc_get_device_info.get("msg", {}).get("system", {}).get("fwversion")
async def _get_hostname(self, rpc_get_device_info: dict = None) -> str | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
return rpc_get_device_info.get("msg", {}).get("network", {}).get("hostname")
async def _get_light_flashing(
self, rpc_get_device_info: dict = None
) -> bool | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
val = rpc_get_device_info.get("msg", {}).get("system", {}).get("ledstatus")
if isinstance(val, str):
return val != "auto"
return None
async def _get_wattage_limit(
self, rpc_get_device_info: dict = None
) -> float | None:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return None
val = rpc_get_device_info.get("msg", {}).get("miner", {}).get("power-limit-set")
try:
return float(val)
except (ValueError, TypeError):
return None
async def _get_fans(self, rpc_get_miner_status_summary: dict = None) -> list[Fan]:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return []
fans = []
summary = rpc_get_miner_status_summary.get("msg", {}).get("summary", {})
for idx, direction in enumerate(["in", "out"]):
rpm = summary.get(f"fan-speed-{direction}")
if rpm is not None:
fans.append(Fan(speed=rpm))
return fans
async def _get_psu_fans(self, rpc_get_device_info: dict = None) -> list[Fan]:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return []
rpm = rpc_get_device_info.get("msg", {}).get("power", {}).get("fanspeed")
return [Fan(speed=rpm)] if rpm is not None else []
async def _get_hashboards(
self,
rpc_get_device_info: dict = None,
rpc_get_miner_status_edevs: dict = None,
) -> list[HashBoard]:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return []
if rpc_get_miner_status_edevs is None:
try:
rpc_get_miner_status_edevs = await self.rpc.get_miner_status_edevs()
except APIError:
return []
boards = []
board_count = (
rpc_get_device_info.get("msg", {}).get("hardware", {}).get("boards", 3)
)
print(rpc_get_miner_status_edevs)
print(rpc_get_device_info)
edevs = rpc_get_miner_status_edevs.get("msg", {}).get("edevs", [])
for idx in range(board_count):
board_data = edevs[idx] if idx < len(edevs) else {}
boards.append(
HashBoard(
slot=idx,
hashrate=self.algo.hashrate(
rate=board_data.get("hash-average", 0), unit=self.algo.unit.TH
).into(self.algo.unit.default),
temp=board_data.get("chip-temp-min"),
inlet_temp=board_data.get("chip-temp-min"),
outlet_temp=board_data.get("chip-temp-max"),
serial_number=board_data.get(f"pcbsn{idx}"),
chips=board_data.get("effective-chips"),
expected_chips=self.expected_chips,
active=(board_data.get("hash-average") or 0) > 0,
missing=False,
tuned=True,
)
)
return boards
async def _get_pools(
self, rpc_get_miner_status_summary: dict = None
) -> list[PoolMetrics]:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return []
pools = []
msg_pools = rpc_get_miner_status_summary.get("msg", {}).get("pools", [])
for idx, pool in enumerate(msg_pools):
pools.append(
PoolMetrics(
index=idx,
user=pool.get("account"),
alive=pool.get("status") == "alive",
active=pool.get("stratum-active"),
url=pool.get("url"),
)
)
return pools
async def _get_uptime(
self, rpc_get_miner_status_summary: dict = None
) -> int | None:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return None
return (
rpc_get_miner_status_summary.get("msg", {})
.get("summary", {})
.get("elapsed")
)
async def _get_wattage(
self, rpc_get_miner_status_summary: dict = None
) -> float | None:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return None
return (
rpc_get_miner_status_summary.get("msg", {})
.get("summary", {})
.get("power-realtime")
)
async def _get_hashrate(
self, rpc_get_miner_status_summary: dict = None
) -> float | None:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return None
return (
rpc_get_miner_status_summary.get("msg", {})
.get("summary", {})
.get("hash-realtime")
)
async def _get_expected_hashrate(
self, rpc_get_miner_status_summary: dict = None
) -> float | None:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return None
return (
rpc_get_miner_status_summary.get("msg", {})
.get("summary", {})
.get("factory-hash")
)
async def _get_env_temp(
self, rpc_get_miner_status_summary: dict = None
) -> float | None:
if rpc_get_miner_status_summary is None:
try:
rpc_get_miner_status_summary = await self.rpc.get_miner_status_summary()
except APIError:
return None
return (
rpc_get_miner_status_summary.get("msg", {})
.get("summary", {})
.get("environment-temperature")
)

View File

@@ -13,23 +13,23 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners.backends.btminer import BTMiner
from pyasic.miners.backends.btminer import BTMiner, BTMinerV3
class M7X(BTMiner):
supports_autotuning = True
class M7X(BTMinerV3):
pass
class M6X(BTMiner):
supports_autotuning = True
class M6X(BTMinerV3):
pass
class M5X(BTMiner):
supports_autotuning = True
class M5X(BTMinerV3):
pass
class M3X(BTMiner):
supports_autotuning = True
class M3X(BTMinerV3):
pass
class M2X(BTMiner):

View File

@@ -1066,6 +1066,45 @@ class MinerFactory:
return data
async def send_btminer_v3_api_command(self, ip, command):
try:
reader, writer = await asyncio.open_connection(ip, 4433)
except (ConnectionError, OSError):
return
cmd = {"cmd": command}
try:
# send the command
json_cmd = json.dumps(cmd).encode("utf-8")
length = len(json_cmd)
writer.write(length.to_bytes(4, byteorder="little"))
writer.write(json_cmd)
await writer.drain()
# receive all the data
resp_len = await reader.readexactly(4)
data = await reader.readexactly(
int.from_bytes(resp_len, byteorder="little")
)
writer.close()
await writer.wait_closed()
except asyncio.CancelledError:
writer.close()
await writer.wait_closed()
return
except (ConnectionError, OSError):
return
if data == b"Socket connect failed: Connection refused\n":
return
try:
data = json.loads(data)
except json.JSONDecodeError:
return {}
return data
@staticmethod
async def _fix_api_data(data: bytes) -> str:
if data.endswith(b"\x00"):
@@ -1185,6 +1224,14 @@ class MinerFactory:
try:
miner_model = sock_json_data["DEVDETAILS"][0]["Model"].replace("_", "")
miner_model = miner_model[:-1] + "0"
return miner_model
except (TypeError, LookupError):
sock_json_data_v3 = await self.send_btminer_v3_api_command(
ip, "get.device.info"
)
try:
miner_model = sock_json_data_v3["msg"]["miner"]["type"].replace("_", "")
miner_model = miner_model[:-1] + "0"
return miner_model
except (TypeError, LookupError):

View File

@@ -23,7 +23,8 @@ import json
import logging
import re
import struct
from typing import Literal, Union
from asyncio import Future, StreamReader, StreamWriter
from typing import Any, AsyncGenerator, Callable, Literal, Union
import httpx
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
@@ -1100,3 +1101,277 @@ class BTMinerRPCAPI(BaseMinerRPCAPI):
</details>
"""
return await self.send_command("get_error_code", allow_warning=False)
class BTMinerV3RPCAPI(BaseMinerRPCAPI):
def __init__(self, ip: str, port: int = 4433, api_ver: str = "0.0.0"):
super().__init__(ip, port, api_ver=api_ver)
self.reader: StreamReader | None = None
self.writer: StreamWriter | None = None
self.reader_loop = None
self.salt = None
self.cmd_results = {}
self.cmd_callbacks = {"get.miner.report": set()}
async def connect(self):
self.reader, self.writer = await asyncio.open_connection(
str(self.ip), self.port
)
self.reader_loop = asyncio.create_task(self._read_loop())
async def disconnect(self):
self.writer.close()
await self.writer.wait_closed()
self.reader_loop.cancel()
async def send_command(
self, command: str, parameters: Any = None, **kwargs
) -> dict:
if self.writer is None:
await self.connect()
while command in self.cmd_results:
wait_fut = self.cmd_results[command]
await wait_fut
result_fut = Future()
self.cmd_results[command] = result_fut
cmd = {"cmd": command}
if parameters is not None:
cmd["param"] = parameters
if command.startswith("set."):
salt = await self.get_salt()
ts = int(datetime.datetime.now().timestamp())
cmd["ts"] = ts
token_str = cmd["cmd"] + self.pwd + salt + str(ts)
token_hashed = bytearray(
base64.b64encode(hashlib.sha256(token_str.encode("utf-8")).digest())
)
token_hashed[8] = 0
cmd["account"] = "super"
cmd["token"] = token_hashed.decode("ascii")
# send the command
ser = json.dumps(cmd).encode("utf-8")
header = struct.pack("<I", len(ser))
await self._send_bytes(header + json.dumps(cmd).encode("utf-8"))
await result_fut
return result_fut.result()
async def _read_loop(self):
while True:
result = await self._read_bytes()
data = self._load_api_data(result)
command = data["desc"]
if command in self.cmd_callbacks:
callbacks: list[Callable] = self.cmd_callbacks[command]
await asyncio.gather(*[callback(data) for callback in callbacks])
elif command in self.cmd_results:
future: Future = self.cmd_results.pop(command)
future.set_result(data)
else:
logging.error(f"Received unexpected data for {self}: {data}")
async def _read_bytes(self, **kwargs) -> bytes:
header = await self.reader.readexactly(4)
length = struct.unpack("<I", header)[0]
return await self.reader.readexactly(length)
async def _send_bytes(self, data: bytes, **kwargs):
self.writer.write(data)
await self.writer.drain()
async def get_salt(self) -> str:
if self.salt is not None:
return self.salt
data = await self.send_command("get.device.info", "salt")
self.salt = data["msg"]["salt"]
return self.salt
async def get_miner_report(self) -> AsyncGenerator[dict]:
if self.writer is None:
await self.connect()
result = asyncio.Queue()
async def callback(data: dict):
await result.put(data)
cb_fn = callback
try:
self.cmd_callbacks["get.miner.report"].add(cb_fn)
while True:
yield await result.get()
if self.writer.is_closing():
break
finally:
self.cmd_callbacks["get.miner.report"].remove(cb_fn)
async def get_system_setting(self) -> dict | None:
return await self.send_command("get.system.setting")
async def get_miner_status_summary(self) -> dict | None:
return await self.send_command("get.miner.status", parameters="summary")
async def get_miner_status_edevs(self) -> dict | None:
return await self.send_command("get.miner.status", parameters="edevs")
async def get_miner_history(self) -> dict | None:
data = await self.send_command(
"get.miner.history",
parameters={
"start": "1",
"stop": str(datetime.datetime.now().timestamp()),
},
)
ret = {}
result = data.get("msg")
if result is not None:
unparsed = result["Data"].strip()
for item in unparsed.split(" "):
list_item = item.split(",")
timestamp = int(list_item.pop(0))
ret[timestamp] = list_item
return ret
async def get_psu_command(self):
return await self.send_command("get.psu.command")
async def get_miner_setting(self) -> dict | None:
return await self.send_command("get.miner.setting")
async def get_device_info(self) -> dict | None:
return await self.send_command("get.device.info")
async def get_log_download(self) -> dict | None:
return await self.send_command("get.log.download")
async def get_fan_setting(self) -> dict | None:
return await self.send_command("get.fan.setting")
async def set_system_reboot(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.reboot")
async def set_system_factory_reset(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.factory_reset")
async def set_system_update_firmware(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.update_firmware")
async def set_system_net_config(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.net_config")
async def set_system_led(self, *args, **kwargs) -> dict | None:
return await self.send_command("set.system.led", "auto")
async def set_system_time_randomized(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.time_randomized")
async def set_system_timezone(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.timezone")
async def set_system_hostname(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.hostname")
async def set_system_webpools(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.webpools")
async def set_miner_target_freq(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.target_freq")
async def set_miner_heat_mode(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.heat_mode")
async def set_system_ntp_server(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.system.ntp_server")
async def set_miner_service(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.service")
async def set_miner_power_mode(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.power_mode")
async def set_miner_cointype(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.cointype")
async def set_miner_pools(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.pools")
async def set_miner_fastboot(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.fastboot")
async def set_miner_power_percent(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.power_percent")
async def set_miner_pre_power_on(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.pre_power_on")
async def set_miner_restore_setting(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.restore_setting")
async def set_miner_report(self, frequency: int = 1) -> dict | None:
return await self.send_command(
"set.miner.report", parameters={"gap": frequency}
)
async def set_miner_power_limit(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.power_limit")
async def set_miner_upfreq_speed(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.miner.upfreq_speed")
async def set_log_upload(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.log.upload")
async def set_user_change_passwd(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.user.change_passwd")
async def set_user_permission(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.user.permission")
async def set_fan_temp_offset(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.fan.temp_offset")
async def set_fan_poweroff_cool(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.fan.poweroff_cool")
async def set_fan_zero_speed(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.fan.zero_speed")
async def set_shell_debug(self, *args, **kwargs) -> dict | None:
raise NotImplementedError
return await self.send_command("set.shell.debug")