refactor: rename API to rpc, and classes from {X}API to {X}RPCAPI to clarify naming.

This commit is contained in:
UpstreamData
2024-01-15 15:09:51 -07:00
parent 4ed49c2321
commit aab8825997
27 changed files with 86 additions and 86 deletions

329
pyasic/rpc/__init__.py Normal file
View File

@@ -0,0 +1,329 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
import ipaddress
import json
import logging
import re
import warnings
from typing import Union
from pyasic.errors import APIError, APIWarning
class BaseMinerRPCAPI:
def __init__(self, ip: str, port: int = 4028) -> None:
# api port, should be 4028
self.port = port
# ip address of the miner
self.ip = ipaddress.ip_address(ip)
self.pwd = "admin"
def __new__(cls, *args, **kwargs):
if cls is BaseMinerRPCAPI:
raise TypeError(f"Only children of '{cls.__name__}' may be instantiated")
return object.__new__(cls)
def __repr__(self):
return f"{self.__class__.__name__}: {str(self.ip)}"
async def send_command(
self,
command: Union[str, bytes],
parameters: Union[str, int, bool] = None,
ignore_errors: bool = False,
allow_warning: bool = True,
**kwargs,
) -> dict:
"""Send an API command to the miner and return the result.
Parameters:
command: The command to sent to the miner.
parameters: Any additional parameters to be sent with the command.
ignore_errors: Whether to raise APIError when the command returns an error.
allow_warning: Whether to warn if the command fails.
Returns:
The return data from the API command parsed from JSON into a dict.
"""
logging.debug(
f"{self} - (Send Privileged Command) - {command} "
+ f"with args {parameters}"
if parameters
else ""
)
# create the command
cmd = {"command": command, **kwargs}
if parameters:
cmd["parameter"] = parameters
# send the command
data = await self._send_bytes(json.dumps(cmd).encode("utf-8"))
if data == b"Socket connect failed: Connection refused\n":
if not ignore_errors:
raise APIError(data.decode("utf-8"))
return {}
data = self._load_api_data(data)
# check for if the user wants to allow errors to return
validation = self._validate_command_output(data)
if not validation[0]:
if not ignore_errors:
# validate the command succeeded
raise APIError(validation[1])
if allow_warning:
logging.warning(
f"{self.ip}: API Command Error: {command}: {validation[1]}"
)
logging.debug(f"{self} - (Send Command) - Received data.")
return data
# Privileged command handler, only used by whatsminers, defined here for consistency.
async def send_privileged_command(self, *args, **kwargs) -> dict:
return await self.send_command(*args, **kwargs)
async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
"""Creates and sends multiple commands as one command to the miner.
Parameters:
*commands: The commands to send as a multicommand to the miner.
allow_warning: A boolean to supress APIWarnings.
"""
while True:
# make sure we can actually run each command, otherwise they will fail
commands = self._check_commands(*commands)
# standard multicommand format is "command1+command2"
# standard format doesn't work for X19
command = "+".join(commands)
try:
data = await self.send_command(command, allow_warning=allow_warning)
except APIError as e:
# try to identify the error
if e.message is not None:
if ":" in e.message:
err_command = e.message.split(":")[0]
if err_command in commands:
commands.remove(err_command)
continue
return {command: [{}] for command in commands}
logging.debug(f"{self} - (Multicommand) - Received data")
data["multicommand"] = True
return data
async def _handle_multicommand(self, command: str, allow_warning: bool = True):
try:
data = await self.send_command(command, allow_warning=allow_warning)
if not "+" in command:
return {command: [data]}
return data
except APIError:
if "+" in command:
return {command: [{}] for command in command.split("+")}
return {command: [{}]}
@property
def commands(self) -> list:
return self.get_commands()
def get_commands(self) -> list:
"""Get a list of command accessible to a specific type of API on the miner.
Returns:
A list of all API commands that the miner supports.
"""
return [
func
for func in
# each function in self
dir(self)
if not func == "commands"
if callable(getattr(self, func)) and
# no __ or _ methods
not func.startswith("__") and not func.startswith("_") and
# remove all functions that are in this base class
func
not in [
func
for func in dir(BaseMinerRPCAPI)
if callable(getattr(BaseMinerRPCAPI, func))
]
]
def _check_commands(self, *commands):
allowed_commands = self.commands
return_commands = []
for command in commands:
if command in allowed_commands:
return_commands.append(command)
else:
warnings.warn(
f"""Removing incorrect command: {command}
If you are sure you want to use this command please use API.send_command("{command}", ignore_errors=True) instead.""",
APIWarning,
)
return return_commands
async def _send_bytes(
self,
data: bytes,
timeout: int = 100,
) -> bytes:
logging.debug(f"{self} - ([Hidden] Send Bytes) - Sending")
try:
# get reader and writer streams
reader, writer = await asyncio.open_connection(str(self.ip), self.port)
# handle OSError 121
except OSError as e:
if e.errno == 121:
logging.warning(
f"{self} - ([Hidden] Send Bytes) - Semaphore timeout expired."
)
return b"{}"
# send the command
logging.debug(f"{self} - ([Hidden] Send Bytes) - Writing")
writer.write(data)
logging.debug(f"{self} - ([Hidden] Send Bytes) - Draining")
await writer.drain()
try:
# TO address a situation where a whatsminer has an unknown PW -AND-
# Fix for stupid whatsminer bug, reboot/restart seem to not load properly in the loop
# have to receive, save the data, check if there is more data by reading with a short timeout
# append that data if there is more, and then onto the main loop.
# the password timeout might need to be longer than 1, but it seems to work for now.
ret_data = await asyncio.wait_for(reader.read(1), timeout=1)
except asyncio.TimeoutError:
return b"{}"
try:
ret_data += await asyncio.wait_for(reader.read(4096), timeout=timeout)
except ConnectionAbortedError:
return b"{}"
# loop to receive all the data
logging.debug(f"{self} - ([Hidden] Send Bytes) - Receiving")
try:
while True:
try:
d = await asyncio.wait_for(reader.read(4096), timeout=timeout)
if not d:
break
ret_data += d
except (asyncio.CancelledError, asyncio.TimeoutError) as e:
raise e
except (asyncio.CancelledError, asyncio.TimeoutError) as e:
raise e
except Exception as e:
logging.warning(f"{self} - ([Hidden] Send Bytes) - API Command Error {e}")
# close the connection
logging.debug(f"{self} - ([Hidden] Send Bytes) - Closing")
writer.close()
await writer.wait_closed()
return ret_data
@staticmethod
def _validate_command_output(data: dict) -> tuple:
# check if the data returned is correct or an error
# if status isn't a key, it is a multicommand
if "STATUS" not in data.keys():
for key in data.keys():
# make sure not to try to turn id into a dict
if not key == "id":
# make sure they succeeded
if "STATUS" in data[key][0].keys():
if data[key][0]["STATUS"][0]["STATUS"] not in ["S", "I"]:
# this is an error
return False, f"{key}: " + data[key][0]["STATUS"][0]["Msg"]
elif "id" not in data.keys():
if isinstance(data["STATUS"], list):
if data["STATUS"][0].get("STATUS", None) in ["S", "I"]:
return True, None
else:
return False, data["STATUS"][0]["Msg"]
elif isinstance(data["STATUS"], dict):
# new style X19 command
if data["STATUS"]["STATUS"] not in ["S", "I"]:
return False, data["STATUS"]["Msg"]
return True, None
if data["STATUS"] not in ["S", "I"]:
return False, data["Msg"]
else:
# make sure the command succeeded
if isinstance(data["STATUS"], str):
if data["STATUS"] in ["RESTART"]:
return True, None
elif isinstance(data["STATUS"], dict):
if data["STATUS"].get("STATUS") in ["S", "I"]:
return True, None
elif data["STATUS"][0]["STATUS"] not in ("S", "I"):
# this is an error
if data["STATUS"][0]["STATUS"] not in ("S", "I"):
return False, data["STATUS"][0]["Msg"]
return True, None
@staticmethod
def _load_api_data(data: bytes) -> dict:
# some json from the API returns with a null byte (\x00) on the end
if data.endswith(b"\x00"):
# handle the null byte
str_data = data.decode("utf-8")[:-1]
else:
# no null byte
str_data = data.decode("utf-8")
# fix an error with a btminer return having an extra comma that breaks json.loads()
str_data = str_data.replace(",}", "}")
# fix an error with a btminer return having a newline that breaks json.loads()
str_data = str_data.replace("\n", "")
# fix an error with a bmminer return not having a specific comma that breaks json.loads()
str_data = str_data.replace("}{", "},{")
# fix an error with a bmminer return having a specific comma that breaks json.loads()
str_data = str_data.replace("[,{", "[{")
# fix an error with a btminer return having a missing comma. (2023-01-06 version)
str_data = str_data.replace('""temp0', '","temp0')
# fix an error with Avalonminers returning inf and nan
str_data = str_data.replace("info", "1nfo")
str_data = str_data.replace("inf", "0")
str_data = str_data.replace("1nfo", "info")
str_data = str_data.replace("nan", "0")
# fix whatever this garbage from avalonminers is `,"id":1}`
if str_data.startswith(","):
str_data = f"{{{str_data[1:]}"
# try to fix an error with overflowing the receive buffer
# this can happen in cases such as bugged btminers returning arbitrary length error info with 100s of errors.
if not str_data.endswith("}"):
str_data = ",".join(str_data.split(",")[:-1]) + "}"
# fix a really nasty bug with whatsminer API v2.0.4 where they return a list structured like a dict
if re.search(r"\"error_code\":\[\".+\"\]", str_data):
str_data = str_data.replace("[", "{").replace("]", "}")
# parse the json
try:
parsed_data = json.loads(str_data)
except json.decoder.JSONDecodeError as e:
raise APIError(f"Decode Error {e}: {str_data}")
return parsed_data

674
pyasic/rpc/bfgminer.py Normal file
View File

@@ -0,0 +1,674 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
import logging
from pyasic.rpc import APIError, BaseMinerRPCAPI
class BFGMinerRPCAPI(BaseMinerRPCAPI):
"""An abstraction of the BFGMiner API.
Each method corresponds to an API command in BFGMiner.
[BFGMiner API documentation](https://github.com/luke-jr/bfgminer/blob/bfgminer/README.RPC)
This class abstracts use of the BFGMiner API, as well as the
methods for sending commands to it. The self.send_command()
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
Parameters:
ip: The IP of the miner to reference the API on.
port: The port to reference the API on. Default is 4028.
"""
def __init__(self, ip: str, api_ver: str = "0.0.0", port: int = 4028):
super().__init__(ip, port)
self.api_ver = api_ver
async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
# make sure we can actually run each command, otherwise they will fail
commands = self._check_commands(*commands)
# standard multicommand format is "command1+command2"
# doesn't work for S19 which uses the backup _x19_multicommand
command = "+".join(commands)
try:
data = await self.send_command(command, allow_warning=allow_warning)
except APIError:
logging.debug(f"{self} - (Multicommand) - Handling X19 multicommand.")
data = await self._x19_multicommand(*command.split("+"))
data["multicommand"] = True
return data
async def _x19_multicommand(self, *commands) -> dict:
tasks = []
# send all commands individually
for cmd in commands:
tasks.append(
asyncio.create_task(self._handle_multicommand(cmd, allow_warning=True))
)
all_data = await asyncio.gather(*tasks)
data = {}
for item in all_data:
data.update(item)
return data
async def version(self) -> dict:
"""Get miner version info.
<details>
<summary>Expand</summary>
Returns:
Miner version information.
</details>
"""
return await self.send_command("version")
async def config(self) -> dict:
"""Get some basic configuration info.
<details>
<summary>Expand</summary>
Returns:
## Some miner configuration information:
* ASC Count <- the number of ASCs
* PGA Count <- the number of PGAs
* Pool Count <- the number of Pools
* Strategy <- the current pool strategy
* Log Interval <- the interval of logging
* Device Code <- list of compiled device drivers
* OS <- the current operating system
* Failover-Only <- failover-only setting
* Scan Time <- scan-time setting
* Queue <- queue setting
* Expiry <- expiry setting
</details>
"""
return await self.send_command("config")
async def summary(self) -> dict:
"""Get the status summary of the miner.
<details>
<summary>Expand</summary>
Returns:
The status summary of the miner.
</details>
"""
return await self.send_command("summary")
async def pools(self) -> dict:
"""Get pool information.
<details>
<summary>Expand</summary>
Returns:
Miner pool information.
</details>
"""
return await self.send_command("pools")
async def devs(self) -> dict:
"""Get data on each PGA/ASC with their details.
<details>
<summary>Expand</summary>
Returns:
Data on each PGA/ASC with their details.
</details>
"""
return await self.send_command("devs")
async def procs(self) -> dict:
"""Get data on each processor with their details.
<details>
<summary>Expand</summary>
Returns:
Data on each processor with their details.
</details>
"""
return await self.send_command("procs")
async def devscan(self, info: str = "") -> dict:
"""Get data on each processor with their details.
<details>
<summary>Expand</summary>
Parameters:
info: Info to scan for device by.
Returns:
Data on each processor with their details.
</details>
"""
return await self.send_command("devscan", parameters=info)
async def pga(self, n: int) -> dict:
"""Get data from PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA number to get data from.
Returns:
Data on the PGA n.
</details>
"""
return await self.send_command("pga", parameters=n)
async def proc(self, n: int = 0) -> dict:
"""Get data processor n.
<details>
<summary>Expand</summary>
Parameters:
n: The processor to get data on.
Returns:
Data on processor n.
</details>
"""
return await self.send_command("proc", parameters=n)
async def pgacount(self) -> dict:
"""Get data fon all PGAs.
<details>
<summary>Expand</summary>
Returns:
Data on the PGAs connected.
</details>
"""
return await self.send_command("pgacount")
async def proccount(self) -> dict:
"""Get data fon all processors.
<details>
<summary>Expand</summary>
Returns:
Data on the processors connected.
</details>
"""
return await self.send_command("proccount")
async def switchpool(self, n: int) -> dict:
"""Switch pools to pool n.
<details>
<summary>Expand</summary>
Parameters:
n: The pool to switch to.
Returns:
A confirmation of switching to pool n.
</details>
"""
return await self.send_command("switchpool", parameters=n)
async def enablepool(self, n: int) -> dict:
"""Enable pool n.
<details>
<summary>Expand</summary>
Parameters:
n: The pool to enable.
Returns:
A confirmation of enabling pool n.
</details>
"""
return await self.send_command("enablepool", parameters=n)
async def addpool(self, url: str, username: str, password: str) -> dict:
"""Add a pool to the miner.
<details>
<summary>Expand</summary>
Parameters:
url: The URL of the new pool to add.
username: The users username on the new pool.
password: The worker password on the new pool.
Returns:
A confirmation of adding the pool.
</details>
"""
return await self.send_command(
"addpool", parameters=f"{url},{username},{password}"
)
async def poolpriority(self, *n: int) -> dict:
"""Set pool priority.
<details>
<summary>Expand</summary>
Parameters:
*n: Pools in order of priority.
Returns:
A confirmation of setting pool priority.
</details>
"""
pools = f"{','.join([str(item) for item in n])}"
return await self.send_command("poolpriority", parameters=pools)
async def poolquota(self, n: int, q: int) -> dict:
"""Set pool quota.
<details>
<summary>Expand</summary>
Parameters:
n: Pool number to set quota on.
q: Quota to set the pool to.
Returns:
A confirmation of setting pool quota.
</details>
"""
return await self.send_command("poolquota", parameters=f"{n},{q}")
async def disablepool(self, n: int) -> dict:
"""Disable a pool.
<details>
<summary>Expand</summary>
Parameters:
n: Pool to disable.
Returns:
A confirmation of diabling the pool.
</details>
"""
return await self.send_command("disablepool", parameters=n)
async def removepool(self, n: int) -> dict:
"""Remove a pool.
<details>
<summary>Expand</summary>
Parameters:
n: Pool to remove.
Returns:
A confirmation of removing the pool.
</details>
"""
return await self.send_command("removepool", parameters=n)
async def save(self, filename: str = None) -> dict:
"""Save the config.
<details>
<summary>Expand</summary>
Parameters:
filename: Filename to save the config as.
Returns:
A confirmation of saving the config.
</details>
"""
if filename:
return await self.send_command("save", parameters=filename)
else:
return await self.send_command("save")
async def quit(self) -> dict:
"""Quit CGMiner.
<details>
<summary>Expand</summary>
Returns:
A single "BYE" before CGMiner quits.
</details>
"""
return await self.send_command("quit")
async def notify(self) -> dict:
"""Notify the user of past errors.
<details>
<summary>Expand</summary>
Returns:
The last status and count of each devices problem(s).
</details>
"""
return await self.send_command("notify")
async def privileged(self) -> dict:
"""Check if you have privileged access.
<details>
<summary>Expand</summary>
Returns:
The STATUS section with an error if you have no privileged access, or success if you have privileged access.
</details>
"""
return await self.send_command("privileged")
async def pgaenable(self, n: int) -> dict:
"""Enable PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA to enable.
Returns:
A confirmation of enabling PGA n.
</details>
"""
return await self.send_command("pgaenable", parameters=n)
async def pgadisable(self, n: int) -> dict:
"""Disable PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA to disable.
Returns:
A confirmation of disabling PGA n.
</details>
"""
return await self.send_command("pgadisable", parameters=n)
async def pgarestart(self, n: int) -> dict:
"""Restart PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA to restart.
Returns:
A confirmation of restarting PGA n.
</details>
"""
return await self.send_command("pgadisable", parameters=n)
async def pgaidentify(self, n: int) -> dict:
"""Identify PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA to identify.
Returns:
A confirmation of identifying PGA n.
</details>
"""
return await self.send_command("pgaidentify", parameters=n)
async def procenable(self, n: int) -> dict:
"""Enable processor n.
<details>
<summary>Expand</summary>
Parameters:
n: The processor to enable.
Returns:
A confirmation of enabling processor n.
</details>
"""
return await self.send_command("procenable", parameters=n)
async def procdisable(self, n: int) -> dict:
"""Disable processor n.
<details>
<summary>Expand</summary>
Parameters:
n: The processor to disable.
Returns:
A confirmation of disabling processor n.
</details>
"""
return await self.send_command("procdisable", parameters=n)
async def procrestart(self, n: int) -> dict:
"""Restart processor n.
<details>
<summary>Expand</summary>
Parameters:
n: The processor to restart.
Returns:
A confirmation of restarting processor n.
</details>
"""
return await self.send_command("procdisable", parameters=n)
async def procidentify(self, n: int) -> dict:
"""Identify processor n.
<details>
<summary>Expand</summary>
Parameters:
n: The processor to identify.
Returns:
A confirmation of identifying processor n.
</details>
"""
return await self.send_command("procidentify", parameters=n)
async def devdetails(self) -> dict:
"""Get data on all devices with their static details.
<details>
<summary>Expand</summary>
Returns:
Data on all devices with their static details.
</details>
"""
return await self.send_command("devdetails")
async def restart(self) -> dict:
"""Restart CGMiner using the API.
<details>
<summary>Expand</summary>
Returns:
A reply informing of the restart.
</details>
"""
return await self.send_command("restart")
async def stats(self) -> dict:
"""Get stats of each device/pool with more than 1 getwork.
<details>
<summary>Expand</summary>
Returns:
Stats of each device/pool with more than 1 getwork.
</details>
"""
return await self.send_command("stats")
async def check(self, command: str) -> dict:
"""Check if the command command exists in CGMiner.
<details>
<summary>Expand</summary>
Parameters:
command: The command to check.
Returns:
## Information about a command:
* Exists (Y/N) <- the command exists in this version
* Access (Y/N) <- you have access to use the command
</details>
"""
return await self.send_command("check", parameters=command)
async def failover_only(self, failover: bool) -> dict:
"""Set failover-only.
<details>
<summary>Expand</summary>
Parameters:
failover: What to set failover-only to.
Returns:
Confirmation of setting failover-only.
</details>
"""
return await self.send_command("failover-only", parameters=failover)
async def coin(self) -> dict:
"""Get information on the current coin.
<details>
<summary>Expand</summary>
Returns:
## Information about the current coin being mined:
* Hash Method <- the hashing algorithm
* Current Block Time <- blocktime as a float, 0 means none
* Current Block Hash <- the hash of the current block, blank means none
* LP <- whether LP is in use on at least 1 pool
* Network Difficulty: the current network difficulty
</details>
"""
return await self.send_command("coin")
async def debug(self, setting: str) -> dict:
"""Set a debug setting.
<details>
<summary>Expand</summary>
Parameters:
setting: Which setting to switch to.
## Options are:
* Silent
* Quiet
* Verbose
* Debug
* RPCProto
* PerDevice
* WorkTime
* Normal
Returns:
Data on which debug setting was enabled or disabled.
</details>
"""
return await self.send_command("debug", parameters=setting)
async def setconfig(self, name: str, n: int) -> dict:
"""Set config of name to value n.
<details>
<summary>Expand</summary>
Parameters:
name: The name of the config setting to set.
## Options are:
* queue
* scantime
* expiry
n: The value to set the 'name' setting to.
Returns:
The results of setting config of name to n.
</details>
"""
return await self.send_command("setconfig", parameters=f"{name},{n}")
async def pgaset(self, n: int, opt: str, val: int = None) -> dict:
"""Set PGA option opt to val on PGA n.
<details>
<summary>Expand</summary>
Options:
```
MMQ -
opt: clock
val: 2 - 250 (multiple of 2)
XBS -
opt: clock
val: 2 - 250 (multiple of 2)
```
Parameters:
n: The PGA to set the options on.
opt: The option to set. Setting this to 'help' returns a help message.
val: The value to set the option to.
Returns:
Confirmation of setting PGA n with opt[,val].
</details>
"""
if val:
return await self.send_command("pgaset", parameters=f"{n},{opt},{val}")
else:
return await self.send_command("pgaset", parameters=f"{n},{opt}")
async def pprocset(self, n: int, opt: str, val: int = None) -> dict:
"""Set processor option opt to val on processor n.
<details>
<summary>Expand</summary>
Options:
```
MMQ -
opt: clock
val: 2 - 250 (multiple of 2)
XBS -
opt: clock
val: 2 - 250 (multiple of 2)
```
Parameters:
n: The PGA to set the options on.
opt: The option to set. Setting this to 'help' returns a help message.
val: The value to set the option to.
Returns:
Confirmation of setting PGA n with opt[,val].
</details>
"""
if val:
return await self.send_command("pgaset", parameters=f"{n},{opt},{val}")
else:
return await self.send_command("pgaset", parameters=f"{n},{opt}")
async def zero(self, which: str, summary: bool) -> dict:
"""Zero a device.
<details>
<summary>Expand</summary>
Parameters:
which: Which device to zero. Setting this to 'all' zeros all devices. Setting this to 'bestshare' zeros only the bestshare values for each pool and global.
summary: Whether or not to show a full summary.
Returns:
the STATUS section with info on the zero and optional summary.
</details>
"""
return await self.send_command("zero", parameters=f"{which},{summary}")

731
pyasic/rpc/bmminer.py Normal file
View File

@@ -0,0 +1,731 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
import logging
from pyasic.rpc import APIError, BaseMinerRPCAPI
class BMMinerRPCAPI(BaseMinerRPCAPI):
"""An abstraction of the BMMiner API.
Each method corresponds to an API command in BMMiner.
[BMMiner API documentation](https://github.com/jameshilliard/bmminer/blob/master/API-README)
This class abstracts use of the BMMiner API, as well as the
methods for sending commands to it. The `self.send_command()`
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
Parameters:
ip: The IP of the miner to reference the API on.
port: The port to reference the API on. Default is 4028.
"""
def __init__(self, ip: str, api_ver: str = "0.0.0", port: int = 4028) -> None:
super().__init__(ip, port=port)
self.api_ver = api_ver
async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
# make sure we can actually run each command, otherwise they will fail
commands = self._check_commands(*commands)
# standard multicommand format is "command1+command2"
# doesn't work for S19 which uses the backup _x19_multicommand
command = "+".join(commands)
try:
data = await self.send_command(command, allow_warning=allow_warning)
except APIError:
logging.debug(f"{self} - (Multicommand) - Handling X19 multicommand.")
data = await self._x19_multicommand(
*command.split("+"), allow_warning=allow_warning
)
data["multicommand"] = True
return data
async def _x19_multicommand(self, *commands, allow_warning: bool = True) -> dict:
tasks = []
# send all commands individually
for cmd in commands:
tasks.append(
asyncio.create_task(self._handle_multicommand(cmd, allow_warning=True))
)
all_data = await asyncio.gather(*tasks)
data = {}
for item in all_data:
data.update(item)
return data
async def version(self) -> dict:
"""Get miner version info.
<details>
<summary>Expand</summary>
Returns:
Miner version information.
</details>
"""
return await self.send_command("version")
async def config(self) -> dict:
"""Get some basic configuration info.
<details>
<summary>Expand</summary>
Returns:
## Some miner configuration information:
* ASC Count <- the number of ASCs
* PGA Count <- the number of PGAs
* Pool Count <- the number of Pools
* Strategy <- the current pool strategy
* Log Interval <- the interval of logging
* Device Code <- list of compiled device drivers
* OS <- the current operating system
* Failover-Only <- failover-only setting
* Scan Time <- scan-time setting
* Queue <- queue setting
* Expiry <- expiry setting
</details>
"""
return await self.send_command("config")
async def summary(self) -> dict:
"""Get the status summary of the miner.
<details>
<summary>Expand</summary>
Returns:
The status summary of the miner.
</details>
"""
return await self.send_command("summary")
async def pools(self) -> dict:
"""Get pool information.
<details>
<summary>Expand</summary>
Returns:
Miner pool information.
</details>
"""
return await self.send_command("pools")
async def devs(self) -> dict:
"""Get data on each PGA/ASC with their details.
<details>
<summary>Expand</summary>
Returns:
Data on each PGA/ASC with their details.
</details>
"""
return await self.send_command("devs")
async def edevs(self, old: bool = False) -> dict:
"""Get data on each PGA/ASC with their details, ignoring blacklisted and zombie devices.
<details>
<summary>Expand</summary>
Parameters:
old: Include zombie devices that became zombies less than 'old' seconds ago
Returns:
Data on each PGA/ASC with their details.
</details>
"""
if old:
return await self.send_command("edevs", parameters=old)
else:
return await self.send_command("edevs")
async def pga(self, n: int) -> dict:
"""Get data from PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA number to get data from.
Returns:
Data on the PGA n.
</details>
"""
return await self.send_command("pga", parameters=n)
async def pgacount(self) -> dict:
"""Get data fon all PGAs.
<details>
<summary>Expand</summary>
Returns:
Data on the PGAs connected.
</details>
"""
return await self.send_command("pgacount")
async def switchpool(self, n: int) -> dict:
"""Switch pools to pool n.
<details>
<summary>Expand</summary>
Parameters:
n: The pool to switch to.
Returns:
A confirmation of switching to pool n.
</details>
"""
return await self.send_command("switchpool", parameters=n)
async def enablepool(self, n: int) -> dict:
"""Enable pool n.
<details>
<summary>Expand</summary>
Parameters:
n: The pool to enable.
Returns:
A confirmation of enabling pool n.
</details>
"""
return await self.send_command("enablepool", parameters=n)
async def addpool(self, url: str, username: str, password: str) -> dict:
"""Add a pool to the miner.
<details>
<summary>Expand</summary>
Parameters:
url: The URL of the new pool to add.
username: The users username on the new pool.
password: The worker password on the new pool.
Returns:
A confirmation of adding the pool.
</details>
"""
return await self.send_command(
"addpool", parameters=f"{url},{username},{password}"
)
async def poolpriority(self, *n: int) -> dict:
"""Set pool priority.
<details>
<summary>Expand</summary>
Parameters:
*n: Pools in order of priority.
Returns:
A confirmation of setting pool priority.
</details>
"""
pools = f"{','.join([str(item) for item in n])}"
return await self.send_command("poolpriority", parameters=pools)
async def poolquota(self, n: int, q: int) -> dict:
"""Set pool quota.
<details>
<summary>Expand</summary>
Parameters:
n: Pool number to set quota on.
q: Quota to set the pool to.
Returns:
A confirmation of setting pool quota.
</details>
"""
return await self.send_command("poolquota", parameters=f"{n},{q}")
async def disablepool(self, n: int) -> dict:
"""Disable a pool.
<details>
<summary>Expand</summary>
Parameters:
n: Pool to disable.
Returns:
A confirmation of diabling the pool.
</details>
"""
return await self.send_command("disablepool", parameters=n)
async def removepool(self, n: int) -> dict:
"""Remove a pool.
<details>
<summary>Expand</summary>
Parameters:
n: Pool to remove.
Returns:
A confirmation of removing the pool.
</details>
"""
return await self.send_command("removepool", parameters=n)
async def save(self, filename: str = None) -> dict:
"""Save the config.
<details>
<summary>Expand</summary>
Parameters:
filename: Filename to save the config as.
Returns:
A confirmation of saving the config.
</details>
"""
if filename:
return await self.send_command("save", parameters=filename)
else:
return await self.send_command("save")
async def quit(self) -> dict:
"""Quit BMMiner.
<details>
<summary>Expand</summary>
Returns:
A single "BYE" before BMMiner quits.
</details>
"""
return await self.send_command("quit")
async def notify(self) -> dict:
"""Notify the user of past errors.
<details>
<summary>Expand</summary>
Returns:
The last status and count of each devices problem(s).
</details>
"""
return await self.send_command("notify")
async def privileged(self) -> dict:
"""Check if you have privileged access.
<details>
<summary>Expand</summary>
Returns:
The STATUS section with an error if you have no privileged access, or success if you have privileged access.
</details>
"""
return await self.send_command("privileged")
async def pgaenable(self, n: int) -> dict:
"""Enable PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA to enable.
Returns:
A confirmation of enabling PGA n.
</details>
"""
return await self.send_command("pgaenable", parameters=n)
async def pgadisable(self, n: int) -> dict:
"""Disable PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA to disable.
Returns:
A confirmation of disabling PGA n.
</details>
"""
return await self.send_command("pgadisable", parameters=n)
async def pgaidentify(self, n: int) -> dict:
"""Identify PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA to identify.
Returns:
A confirmation of identifying PGA n.
</details>
"""
return await self.send_command("pgaidentify", parameters=n)
async def devdetails(self) -> dict:
"""Get data on all devices with their static details.
<details>
<summary>Expand</summary>
Returns:
Data on all devices with their static details.
</details>
"""
return await self.send_command("devdetails")
async def restart(self) -> dict:
"""Restart BMMiner using the API.
<details>
<summary>Expand</summary>
Returns:
A reply informing of the restart.
</details>
"""
return await self.send_command("restart")
async def stats(self) -> dict:
"""Get stats of each device/pool with more than 1 getwork.
<details>
<summary>Expand</summary>
Returns:
Stats of each device/pool with more than 1 getwork.
</details>
"""
return await self.send_command("stats")
async def estats(self, old: bool = False) -> dict:
"""Get stats of each device/pool with more than 1 getwork, ignoring zombie devices.
<details>
<summary>Expand</summary>
Parameters:
old: Include zombie devices that became zombies less than 'old' seconds ago.
Returns:
Stats of each device/pool with more than 1 getwork, ignoring zombie devices.
</details>
"""
if old:
return await self.send_command("estats", parameters=old)
else:
return await self.send_command("estats")
async def check(self, command: str) -> dict:
"""Check if the command command exists in BMMiner.
<details>
<summary>Expand</summary>
Parameters:
command: The command to check.
Returns:
## Information about a command:
* Exists (Y/N) <- the command exists in this version
* Access (Y/N) <- you have access to use the command
</details>
"""
return await self.send_command("check", parameters=command)
async def failover_only(self, failover: bool) -> dict:
"""Set failover-only.
<details>
<summary>Expand</summary>
Parameters:
failover: What to set failover-only to.
Returns:
Confirmation of setting failover-only.
</details>
"""
return await self.send_command("failover-only", parameters=failover)
async def coin(self) -> dict:
"""Get information on the current coin.
<details>
<summary>Expand</summary>
Returns:
## Information about the current coin being mined:
* Hash Method <- the hashing algorithm
* Current Block Time <- blocktime as a float, 0 means none
* Current Block Hash <- the hash of the current block, blank means none
* LP <- whether LP is in use on at least 1 pool
* Network Difficulty: the current network difficulty
</details>
"""
return await self.send_command("coin")
async def debug(self, setting: str) -> dict:
"""Set a debug setting.
<details>
<summary>Expand</summary>
Parameters:
setting: Which setting to switch to.
## Options are:
* Silent
* Quiet
* Verbose
* Debug
* RPCProto
* PerDevice
* WorkTime
* Normal
Returns:
Data on which debug setting was enabled or disabled.
</details>
"""
return await self.send_command("debug", parameters=setting)
async def setconfig(self, name: str, n: int) -> dict:
"""Set config of name to value n.
<details>
<summary>Expand</summary>
Parameters:
name: The name of the config setting to set.
## Options are:
* queue
* scantime
* expiry
n: The value to set the 'name' setting to.
Returns:
The results of setting config of name to n.
</details>
"""
return await self.send_command("setconfig", parameters=f"{name},{n}")
async def usbstats(self) -> dict:
"""Get stats of all USB devices except ztex.
<details>
<summary>Expand</summary>
Returns:
The stats of all USB devices except ztex.
</details>
"""
return await self.send_command("usbstats")
async def pgaset(self, n: int, opt: str, val: int = None) -> dict:
"""Set PGA option opt to val on PGA n.
<details>
<summary>Expand</summary>
Options:
```
MMQ -
opt: clock
val: 160 - 230 (multiple of 2)
CMR -
opt: clock
val: 100 - 220
```
Parameters:
n: The PGA to set the options on.
opt: The option to set. Setting this to 'help' returns a help message.
val: The value to set the option to.
Returns:
Confirmation of setting PGA n with opt[,val].
</details>
"""
if val:
return await self.send_command("pgaset", parameters=f"{n},{opt},{val}")
else:
return await self.send_command("pgaset", parameters=f"{n},{opt}")
async def zero(self, which: str, summary: bool) -> dict:
"""Zero a device.
<details>
<summary>Expand</summary>
Parameters:
which: Which device to zero. Setting this to 'all' zeros all devices. Setting this to 'bestshare' zeros only the bestshare values for each pool and global.
summary: Whether or not to show a full summary.
Returns:
the STATUS section with info on the zero and optional summary.
</details>
"""
return await self.send_command("zero", parameters=f"{which},{summary}")
async def hotplug(self, n: int) -> dict:
"""Enable hotplug.
<details>
<summary>Expand</summary>
Parameters:
n: The device number to set hotplug on.
Returns:
Information on hotplug status.
</details>
"""
return await self.send_command("hotplug", parameters=n)
async def asc(self, n: int) -> dict:
"""Get data for ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to get data for.
Returns:
The data for ASC device n.
</details>
"""
return await self.send_command("asc", parameters=n)
async def ascenable(self, n: int) -> dict:
"""Enable ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to enable.
Returns:
Confirmation of enabling ASC device n.
</details>
"""
return await self.send_command("ascenable", parameters=n)
async def ascdisable(self, n: int) -> dict:
"""Disable ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to disable.
Returns:
Confirmation of disabling ASC device n.
</details>
"""
return await self.send_command("ascdisable", parameters=n)
async def ascidentify(self, n: int) -> dict:
"""Identify ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to identify.
Returns:
Confirmation of identifying ASC device n.
</details>
"""
return await self.send_command("ascidentify", parameters=n)
async def asccount(self) -> dict:
"""Get data on the number of ASC devices and their info.
<details>
<summary>Expand</summary>
Returns:
Data on all ASC devices.
</details>
"""
return await self.send_command("asccount")
async def ascset(self, n: int, opt: str, val: int = None) -> dict:
"""Set ASC n option opt to value val.
<details>
<summary>Expand</summary>
Sets an option on the ASC n to a value. Allowed options are:
```
AVA+BTB -
opt: freq
val: 256 - 1024 (chip frequency)
BTB -
opt: millivolts
val: 1000 - 1400 (core voltage)
MBA -
opt: reset
val: 0 - # of chips (reset a chip)
opt: freq
val: 0 - # of chips, 100 - 1400 (chip frequency)
opt: ledcount
val: 0 - 100 (chip count for LED)
opt: ledlimit
val: 0 - 200 (LED off below GH/s)
opt: spidelay
val: 0 - 9999 (SPI per I/O delay)
opt: spireset
val: i or s, 0 - 9999 (SPI regular reset)
opt: spisleep
val: 0 - 9999 (SPI reset sleep in ms)
BMA -
opt: volt
val: 0 - 9
opt: clock
val: 0 - 15
```
Parameters:
n: The ASC to set the options on.
opt: The option to set. Setting this to 'help' returns a help message.
val: The value to set the option to.
Returns:
Confirmation of setting option opt to value val.
</details>
"""
if val:
return await self.send_command("ascset", parameters=f"{n},{opt},{val}")
else:
return await self.send_command("ascset", parameters=f"{n},{opt}")
async def lcd(self) -> dict:
"""Get a general all-in-one status summary of the miner.
<details>
<summary>Expand</summary>
Returns:
An all-in-one status summary of the miner.
</details>
"""
return await self.send_command("lcd")
async def lockstats(self) -> dict:
"""Write lockstats to STDERR.
<details>
<summary>Expand</summary>
Returns:
The result of writing the lock stats to STDERR.
</details>
"""
return await self.send_command("lockstats")

278
pyasic/rpc/bosminer.py Normal file
View File

@@ -0,0 +1,278 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.rpc import BaseMinerRPCAPI
class BOSMinerRPCAPI(BaseMinerRPCAPI):
"""An abstraction of the BOSMiner API.
Each method corresponds to an API command in BOSMiner.
[BOSMiner API documentation](https://docs.braiins.com/os/plus-en/Development/1_api.html)
This class abstracts use of the BOSMiner API, as well as the
methods for sending commands to it. The `self.send_command()`
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
Parameters:
ip: The IP of the miner to reference the API on.
port: The port to reference the API on. Default is 4028.
"""
def __init__(self, ip: str, api_ver: str = "0.0.0", port: int = 4028) -> None:
super().__init__(ip, port=port)
self.api_ver = api_ver
async def asccount(self) -> dict:
"""Get data on the number of ASC devices and their info.
<details>
<summary>Expand</summary>
Returns:
Data on all ASC devices.
</details>
"""
return await self.send_command("asccount")
async def asc(self, n: int) -> dict:
"""Get data for ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to get data for.
Returns:
The data for ASC device n.
</details>
"""
return await self.send_command("asc", parameters=n)
async def devdetails(self) -> dict:
"""Get data on all devices with their static details.
<details>
<summary>Expand</summary>
Returns:
Data on all devices with their static details.
</details>
"""
return await self.send_command("devdetails")
async def devs(self) -> dict:
"""Get data on each PGA/ASC with their details.
<details>
<summary>Expand</summary>
Returns:
Data on each PGA/ASC with their details.
</details>
"""
return await self.send_command("devs")
async def edevs(self, old: bool = False) -> dict:
"""Get data on each PGA/ASC with their details, ignoring blacklisted and zombie devices.
<details>
<summary>Expand</summary>
Parameters:
old: Include zombie devices that became zombies less than 'old' seconds ago
Returns:
Data on each PGA/ASC with their details.
</details>
"""
if old:
return await self.send_command("edevs", parameters="old")
else:
return await self.send_command("edevs")
async def pools(self) -> dict:
"""Get pool information.
<details>
<summary>Expand</summary>
Returns:
Miner pool information.
</details>
"""
return await self.send_command("pools")
async def summary(self) -> dict:
"""Get the status summary of the miner.
<details>
<summary>Expand</summary>
Returns:
The status summary of the miner.
</details>
"""
return await self.send_command("summary")
async def stats(self) -> dict:
"""Get stats of each device/pool with more than 1 getwork.
<details>
<summary>Expand</summary>
Returns:
Stats of each device/pool with more than 1 getwork.
</details>
"""
return await self.send_command("stats")
async def version(self) -> dict:
"""Get miner version info.
<details>
<summary>Expand</summary>
Returns:
Miner version information.
</details>
"""
return await self.send_command("version")
async def estats(self, old: bool = False) -> dict:
"""Get stats of each device/pool with more than 1 getwork, ignoring zombie devices.
<details>
<summary>Expand</summary>
Parameters:
old: Include zombie devices that became zombies less than 'old' seconds ago.
Returns:
Stats of each device/pool with more than 1 getwork, ignoring zombie devices.
</details>
"""
if old:
return await self.send_command("estats", parameters=old)
else:
return await self.send_command("estats")
async def check(self, command: str) -> dict:
"""Check if the command `command` exists in BOSMiner.
<details>
<summary>Expand</summary>
Parameters:
command: The command to check.
Returns:
## Information about a command:
* Exists (Y/N) <- the command exists in this version
* Access (Y/N) <- you have access to use the command
</details>
"""
return await self.send_command("check", parameters=command)
async def coin(self) -> dict:
"""Get information on the current coin.
<details>
<summary>Expand</summary>
Returns:
## Information about the current coin being mined:
* Hash Method <- the hashing algorithm
* Current Block Time <- blocktime as a float, 0 means none
* Current Block Hash <- the hash of the current block, blank means none
* LP <- whether LP is in use on at least 1 pool
* Network Difficulty: the current network difficulty
</details>
"""
return await self.send_command("coin")
async def lcd(self) -> dict:
"""Get a general all-in-one status summary of the miner.
<details>
<summary>Expand</summary>
Returns:
An all-in-one status summary of the miner.
</details>
"""
return await self.send_command("lcd")
async def fans(self) -> dict:
"""Get fan data.
<details>
<summary>Expand</summary>
Returns:
Data on the fans of the miner.
</details>
"""
return await self.send_command("fans")
async def tempctrl(self) -> dict:
"""Get temperature control data.
<details>
<summary>Expand</summary>
Returns:
Data about the temp control settings of the miner.
</details>
"""
return await self.send_command("tempctrl")
async def temps(self) -> dict:
"""Get temperature data.
<details>
<summary>Expand</summary>
Returns:
Data on the temps of the miner.
</details>
"""
return await self.send_command("temps")
async def tunerstatus(self) -> dict:
"""Get tuner status data
<details>
<summary>Expand</summary>
Returns:
Data on the status of autotuning.
</details>
"""
return await self.send_command("tunerstatus")
async def pause(self) -> dict:
"""Pause mining.
<details>
<summary>Expand</summary>
Returns:
Confirmation of pausing mining.
</details>
"""
return await self.send_command("pause")
async def resume(self) -> dict:
"""Resume mining.
<details>
<summary>Expand</summary>
Returns:
Confirmation of resuming mining.
</details>
"""
return await self.send_command("resume")

1042
pyasic/rpc/btminer.py Normal file

File diff suppressed because it is too large Load Diff

729
pyasic/rpc/cgminer.py Normal file
View File

@@ -0,0 +1,729 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
import logging
from pyasic.rpc import APIError, BaseMinerRPCAPI
class CGMinerRPCAPI(BaseMinerRPCAPI):
"""An abstraction of the CGMiner API.
Each method corresponds to an API command in GGMiner.
[CGMiner API documentation](https://github.com/ckolivas/cgminer/blob/master/API-README)
This class abstracts use of the CGMiner API, as well as the
methods for sending commands to it. The self.send_command()
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
Parameters:
ip: The IP of the miner to reference the API on.
port: The port to reference the API on. Default is 4028.
"""
def __init__(self, ip: str, api_ver: str = "0.0.0", port: int = 4028):
super().__init__(ip, port)
self.api_ver = api_ver
async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
# make sure we can actually run each command, otherwise they will fail
commands = self._check_commands(*commands)
# standard multicommand format is "command1+command2"
# doesn't work for S19 which uses the backup _x19_multicommand
command = "+".join(commands)
try:
data = await self.send_command(command, allow_warning=allow_warning)
except APIError:
logging.debug(f"{self} - (Multicommand) - Handling X19 multicommand.")
data = await self._x19_multicommand(*command.split("+"))
data["multicommand"] = True
return data
async def _x19_multicommand(self, *commands) -> dict:
tasks = []
# send all commands individually
for cmd in commands:
tasks.append(
asyncio.create_task(self._handle_multicommand(cmd, allow_warning=True))
)
all_data = await asyncio.gather(*tasks)
data = {}
for item in all_data:
data.update(item)
return data
async def version(self) -> dict:
"""Get miner version info.
<details>
<summary>Expand</summary>
Returns:
Miner version information.
</details>
"""
return await self.send_command("version")
async def config(self) -> dict:
"""Get some basic configuration info.
<details>
<summary>Expand</summary>
Returns:
## Some miner configuration information:
* ASC Count <- the number of ASCs
* PGA Count <- the number of PGAs
* Pool Count <- the number of Pools
* Strategy <- the current pool strategy
* Log Interval <- the interval of logging
* Device Code <- list of compiled device drivers
* OS <- the current operating system
* Failover-Only <- failover-only setting
* Scan Time <- scan-time setting
* Queue <- queue setting
* Expiry <- expiry setting
</details>
"""
return await self.send_command("config")
async def summary(self) -> dict:
"""Get the status summary of the miner.
<details>
<summary>Expand</summary>
Returns:
The status summary of the miner.
</details>
"""
return await self.send_command("summary")
async def pools(self) -> dict:
"""Get pool information.
<details>
<summary>Expand</summary>
Returns:
Miner pool information.
</details>
"""
return await self.send_command("pools")
async def devs(self) -> dict:
"""Get data on each PGA/ASC with their details.
<details>
<summary>Expand</summary>
Returns:
Data on each PGA/ASC with their details.
</details>
"""
return await self.send_command("devs")
async def edevs(self, old: bool = False) -> dict:
"""Get data on each PGA/ASC with their details, ignoring blacklisted and zombie devices.
<details>
<summary>Expand</summary>
Parameters:
old: Include zombie devices that became zombies less than 'old' seconds ago
Returns:
Data on each PGA/ASC with their details.
</details>
"""
if old:
return await self.send_command("edevs", parameters=old)
else:
return await self.send_command("edevs")
async def pga(self, n: int) -> dict:
"""Get data from PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA number to get data from.
Returns:
Data on the PGA n.
</details>
"""
return await self.send_command("pga", parameters=n)
async def pgacount(self) -> dict:
"""Get data fon all PGAs.
<details>
<summary>Expand</summary>
Returns:
Data on the PGAs connected.
</details>
"""
return await self.send_command("pgacount")
async def switchpool(self, n: int) -> dict:
"""Switch pools to pool n.
<details>
<summary>Expand</summary>
Parameters:
n: The pool to switch to.
Returns:
A confirmation of switching to pool n.
</details>
"""
return await self.send_command("switchpool", parameters=n)
async def enablepool(self, n: int) -> dict:
"""Enable pool n.
<details>
<summary>Expand</summary>
Parameters:
n: The pool to enable.
Returns:
A confirmation of enabling pool n.
</details>
"""
return await self.send_command("enablepool", parameters=n)
async def addpool(self, url: str, username: str, password: str) -> dict:
"""Add a pool to the miner.
<details>
<summary>Expand</summary>
Parameters:
url: The URL of the new pool to add.
username: The users username on the new pool.
password: The worker password on the new pool.
Returns:
A confirmation of adding the pool.
</details>
"""
return await self.send_command(
"addpool", parameters=f"{url},{username},{password}"
)
async def poolpriority(self, *n: int) -> dict:
"""Set pool priority.
<details>
<summary>Expand</summary>
Parameters:
*n: Pools in order of priority.
Returns:
A confirmation of setting pool priority.
</details>
"""
pools = f"{','.join([str(item) for item in n])}"
return await self.send_command("poolpriority", parameters=pools)
async def poolquota(self, n: int, q: int) -> dict:
"""Set pool quota.
<details>
<summary>Expand</summary>
Parameters:
n: Pool number to set quota on.
q: Quota to set the pool to.
Returns:
A confirmation of setting pool quota.
</details>
"""
return await self.send_command("poolquota", parameters=f"{n},{q}")
async def disablepool(self, n: int) -> dict:
"""Disable a pool.
<details>
<summary>Expand</summary>
Parameters:
n: Pool to disable.
Returns:
A confirmation of diabling the pool.
</details>
"""
return await self.send_command("disablepool", parameters=n)
async def removepool(self, n: int) -> dict:
"""Remove a pool.
<details>
<summary>Expand</summary>
Parameters:
n: Pool to remove.
Returns:
A confirmation of removing the pool.
</details>
"""
return await self.send_command("removepool", parameters=n)
async def save(self, filename: str = None) -> dict:
"""Save the config.
<details>
<summary>Expand</summary>
Parameters:
filename: Filename to save the config as.
Returns:
A confirmation of saving the config.
</details>
"""
if filename:
return await self.send_command("save", parameters=filename)
else:
return await self.send_command("save")
async def quit(self) -> dict:
"""Quit CGMiner.
<details>
<summary>Expand</summary>
Returns:
A single "BYE" before CGMiner quits.
</details>
"""
return await self.send_command("quit")
async def notify(self) -> dict:
"""Notify the user of past errors.
<details>
<summary>Expand</summary>
Returns:
The last status and count of each devices problem(s).
</details>
"""
return await self.send_command("notify")
async def privileged(self) -> dict:
"""Check if you have privileged access.
<details>
<summary>Expand</summary>
Returns:
The STATUS section with an error if you have no privileged access, or success if you have privileged access.
</details>
"""
return await self.send_command("privileged")
async def pgaenable(self, n: int) -> dict:
"""Enable PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA to enable.
Returns:
A confirmation of enabling PGA n.
</details>
"""
return await self.send_command("pgaenable", parameters=n)
async def pgadisable(self, n: int) -> dict:
"""Disable PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA to disable.
Returns:
A confirmation of disabling PGA n.
</details>
"""
return await self.send_command("pgadisable", parameters=n)
async def pgaidentify(self, n: int) -> dict:
"""Identify PGA n.
<details>
<summary>Expand</summary>
Parameters:
n: The PGA to identify.
Returns:
A confirmation of identifying PGA n.
</details>
"""
return await self.send_command("pgaidentify", parameters=n)
async def devdetails(self) -> dict:
"""Get data on all devices with their static details.
<details>
<summary>Expand</summary>
Returns:
Data on all devices with their static details.
</details>
"""
return await self.send_command("devdetails")
async def restart(self) -> dict:
"""Restart CGMiner using the API.
<details>
<summary>Expand</summary>
Returns:
A reply informing of the restart.
</details>
"""
return await self.send_command("restart")
async def stats(self) -> dict:
"""Get stats of each device/pool with more than 1 getwork.
<details>
<summary>Expand</summary>
Returns:
Stats of each device/pool with more than 1 getwork.
</details>
"""
return await self.send_command("stats")
async def estats(self, old: bool = False) -> dict:
"""Get stats of each device/pool with more than 1 getwork, ignoring zombie devices.
<details>
<summary>Expand</summary>
Parameters:
old: Include zombie devices that became zombies less than 'old' seconds ago.
Returns:
Stats of each device/pool with more than 1 getwork, ignoring zombie devices.
</details>
"""
if old:
return await self.send_command("estats", parameters=old)
else:
return await self.send_command("estats")
async def check(self, command: str) -> dict:
"""Check if the command command exists in CGMiner.
<details>
<summary>Expand</summary>
Parameters:
command: The command to check.
Returns:
## Information about a command:
* Exists (Y/N) <- the command exists in this version
* Access (Y/N) <- you have access to use the command
</details>
"""
return await self.send_command("check", parameters=command)
async def failover_only(self, failover: bool) -> dict:
"""Set failover-only.
<details>
<summary>Expand</summary>
Parameters:
failover: What to set failover-only to.
Returns:
Confirmation of setting failover-only.
</details>
"""
return await self.send_command("failover-only", parameters=failover)
async def coin(self) -> dict:
"""Get information on the current coin.
<details>
<summary>Expand</summary>
Returns:
## Information about the current coin being mined:
* Hash Method <- the hashing algorithm
* Current Block Time <- blocktime as a float, 0 means none
* Current Block Hash <- the hash of the current block, blank means none
* LP <- whether LP is in use on at least 1 pool
* Network Difficulty: the current network difficulty
</details>
"""
return await self.send_command("coin")
async def debug(self, setting: str) -> dict:
"""Set a debug setting.
<details>
<summary>Expand</summary>
Parameters:
setting: Which setting to switch to.
## Options are:
* Silent
* Quiet
* Verbose
* Debug
* RPCProto
* PerDevice
* WorkTime
* Normal
Returns:
Data on which debug setting was enabled or disabled.
</details>
"""
return await self.send_command("debug", parameters=setting)
async def setconfig(self, name: str, n: int) -> dict:
"""Set config of name to value n.
<details>
<summary>Expand</summary>
Parameters:
name: The name of the config setting to set.
## Options are:
* queue
* scantime
* expiry
n: The value to set the 'name' setting to.
Returns:
The results of setting config of name to n.
</details>
"""
return await self.send_command("setconfig", parameters=f"{name},{n}")
async def usbstats(self) -> dict:
"""Get stats of all USB devices except ztex.
<details>
<summary>Expand</summary>
Returns:
The stats of all USB devices except ztex.
</details>
"""
return await self.send_command("usbstats")
async def pgaset(self, n: int, opt: str, val: int = None) -> dict:
"""Set PGA option opt to val on PGA n.
<details>
<summary>Expand</summary>
Options:
```
MMQ -
opt: clock
val: 160 - 230 (multiple of 2)
CMR -
opt: clock
val: 100 - 220
```
Parameters:
n: The PGA to set the options on.
opt: The option to set. Setting this to 'help' returns a help message.
val: The value to set the option to.
Returns:
Confirmation of setting PGA n with opt[,val].
</details>
"""
if val:
return await self.send_command("pgaset", parameters=f"{n},{opt},{val}")
else:
return await self.send_command("pgaset", parameters=f"{n},{opt}")
async def zero(self, which: str, summary: bool) -> dict:
"""Zero a device.
<details>
<summary>Expand</summary>
Parameters:
which: Which device to zero. Setting this to 'all' zeros all devices. Setting this to 'bestshare' zeros only the bestshare values for each pool and global.
summary: Whether or not to show a full summary.
Returns:
the STATUS section with info on the zero and optional summary.
</details>
"""
return await self.send_command("zero", parameters=f"{which},{summary}")
async def hotplug(self, n: int) -> dict:
"""Enable hotplug.
<details>
<summary>Expand</summary>
Parameters:
n: The device number to set hotplug on.
Returns:
Information on hotplug status.
</details>
"""
return await self.send_command("hotplug", parameters=n)
async def asc(self, n: int) -> dict:
"""Get data for ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to get data for.
Returns:
The data for ASC device n.
</details>
"""
return await self.send_command("asc", parameters=n)
async def ascenable(self, n: int) -> dict:
"""Enable ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to enable.
Returns:
Confirmation of enabling ASC device n.
</details>
"""
return await self.send_command("ascenable", parameters=n)
async def ascdisable(self, n: int) -> dict:
"""Disable ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to disable.
Returns:
Confirmation of disabling ASC device n.
</details>
"""
return await self.send_command("ascdisable", parameters=n)
async def ascidentify(self, n: int) -> dict:
"""Identify ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to identify.
Returns:
Confirmation of identifying ASC device n.
</details>
"""
return await self.send_command("ascidentify", parameters=n)
async def asccount(self) -> dict:
"""Get data on the number of ASC devices and their info.
<details>
<summary>Expand</summary>
Returns:
Data on all ASC devices.
</details>
"""
return await self.send_command("asccount")
async def ascset(self, n: int, opt: str, val: int = None) -> dict:
"""Set ASC n option opt to value val.
<details>
<summary>Expand</summary>
Sets an option on the ASC n to a value. Allowed options are:
```
AVA+BTB -
opt: freq
val: 256 - 1024 (chip frequency)
BTB -
opt: millivolts
val: 1000 - 1400 (core voltage)
MBA -
opt: reset
val: 0 - # of chips (reset a chip)
opt: freq
val: 0 - # of chips, 100 - 1400 (chip frequency)
opt: ledcount
val: 0 - 100 (chip count for LED)
opt: ledlimit
val: 0 - 200 (LED off below GH/s)
opt: spidelay
val: 0 - 9999 (SPI per I/O delay)
opt: spireset
val: i or s, 0 - 9999 (SPI regular reset)
opt: spisleep
val: 0 - 9999 (SPI reset sleep in ms)
BMA -
opt: volt
val: 0 - 9
opt: clock
val: 0 - 15
```
Parameters:
n: The ASC to set the options on.
opt: The option to set. Setting this to 'help' returns a help message.
val: The value to set the option to.
Returns:
Confirmation of setting option opt to value val.
</details>
"""
if val:
return await self.send_command("ascset", parameters=f"{n},{opt},{val}")
else:
return await self.send_command("ascset", parameters=f"{n},{opt}")
async def lcd(self) -> dict:
"""Get a general all-in-one status summary of the miner.
<details>
<summary>Expand</summary>
Returns:
An all-in-one status summary of the miner.
</details>
"""
return await self.send_command("lcd")
async def lockstats(self) -> dict:
"""Write lockstats to STDERR.
<details>
<summary>Expand</summary>
Returns:
The result of writing the lock stats to STDERR.
</details>
"""
return await self.send_command("lockstats")

759
pyasic/rpc/luxminer.py Normal file
View File

@@ -0,0 +1,759 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import Literal
from pyasic.rpc import BaseMinerRPCAPI
class LUXMinerRPCAPI(BaseMinerRPCAPI):
"""An abstraction of the LUXMiner API.
Each method corresponds to an API command in LUXMiner.
[LUXMiner API documentation](https://docs.firmware.luxor.tech/API/intro)
This class abstracts use of the LUXMiner API, as well as the
methods for sending commands to it. The `self.send_command()`
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
Parameters:
ip: The IP of the miner to reference the API on.
port: The port to reference the API on. Default is 4028.
"""
def __init__(self, ip: str, api_ver: str = "0.0.0", port: int = 4028) -> None:
super().__init__(ip, port=port)
self.api_ver = api_ver
async def addgroup(self, name: str, quota: int) -> dict:
"""Add a pool group.
<details>
<summary>Expand</summary>
Parameters:
name: The group name.
quota: The group quota.
Returns:
Confirmation of adding a pool group.
</details>
"""
return await self.send_command("addgroup", parameters=f"{name},{quota}")
async def addpool(
self, url: str, user: str, pwd: str = "", group_id: str = None
) -> dict:
"""Add a pool.
<details>
<summary>Expand</summary>
Parameters:
url: The pool url.
user: The pool username.
pwd: The pool password.
group_id: The group ID to use.
Returns:
Confirmation of adding a pool.
</details>
"""
pool_data = [url, user, pwd]
if group_id is not None:
pool_data.append(group_id)
return await self.send_command("addpool", parameters=",".join(pool_data))
async def asc(self, n: int) -> dict:
"""Get data for ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to get data for.
Returns:
The data for ASC device n.
</details>
"""
return await self.send_command("asc", parameters=n)
async def asccount(self) -> dict:
"""Get data on the number of ASC devices and their info.
<details>
<summary>Expand</summary>
Returns:
Data on all ASC devices.
</details>
"""
return await self.send_command("asccount")
async def check(self, command: str) -> dict:
"""Check if the command `command` exists in LUXMiner.
<details>
<summary>Expand</summary>
Parameters:
command: The command to check.
Returns:
## Information about a command:
* Exists (Y/N) <- the command exists in this version
* Access (Y/N) <- you have access to use the command
</details>
"""
return await self.send_command("check", parameters=command)
async def coin(self) -> dict:
"""Get information on the current coin.
<details>
<summary>Expand</summary>
Returns:
## Information about the current coin being mined:
* Hash Method <- the hashing algorithm
* Current Block Time <- blocktime as a float, 0 means none
* Current Block Hash <- the hash of the current block, blank means none
* LP <- whether LP is in use on at least 1 pool
* Network Difficulty: the current network difficulty
</details>
"""
return await self.send_command("coin")
async def config(self) -> dict:
"""Get some basic configuration info.
<details>
<summary>Expand</summary>
Returns:
Miner configuration information.
</details>
"""
return await self.send_command("config")
async def curtail(self, session_id: str) -> dict:
"""Put the miner into sleep mode. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
A confirmation of putting the miner to sleep.
</details>
"""
return await self.send_command("curtail", parameters=session_id)
async def devdetails(self) -> dict:
"""Get data on all devices with their static details.
<details>
<summary>Expand</summary>
Returns:
Data on all devices with their static details.
</details>
"""
return await self.send_command("devdetails")
async def devs(self) -> dict:
"""Get data on each PGA/ASC with their details.
<details>
<summary>Expand</summary>
Returns:
Data on each PGA/ASC with their details.
</details>
"""
return await self.send_command("devs")
async def disablepool(self, n: int) -> dict:
"""Disable a pool.
<details>
<summary>Expand</summary>
Parameters:
n: Pool to disable.
Returns:
A confirmation of diabling the pool.
</details>
"""
return await self.send_command("disablepool", parameters=n)
async def edevs(self) -> dict:
"""Alias for devs"""
return await self.devs()
async def enablepool(self, n: int) -> dict:
"""Enable pool n.
<details>
<summary>Expand</summary>
Parameters:
n: The pool to enable.
Returns:
A confirmation of enabling pool n.
</details>
"""
return await self.send_command("enablepool", parameters=n)
async def estats(self) -> dict:
"""Alias for stats"""
return await self.stats()
async def fans(self) -> dict:
"""Get fan data.
<details>
<summary>Expand</summary>
Returns:
Data on the fans of the miner.
</details>
"""
return await self.send_command("fans")
async def fanset(self, session_id: str, speed: int, min_fans: int = None) -> dict:
"""Set fan control. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
speed: The fan speed to set. Use -1 to set automatically.
min_fans: The minimum number of fans to use. Optional.
Returns:
A confirmation of setting fan control values.
</details>
"""
fanset_data = [str(session_id), str(speed)]
if min_fans is not None:
fanset_data.append(str(min_fans))
return await self.send_command("fanset", parameters=",".join(fanset_data))
async def frequencyget(self, board_n: int, chip_n: int = None) -> dict:
"""Get frequency data for a board and chips.
<details>
<summary>Expand</summary>
Parameters:
board_n: The board number to get frequency info from.
chip_n: The chip number to get frequency info from. Optional.
Returns:
Board and/or chip frequency values.
</details>
"""
frequencyget_data = [str(board_n)]
if chip_n is not None:
frequencyget_data.append(str(chip_n))
return await self.send_command(
"frequencyget", parameters=",".join(frequencyget_data)
)
async def frequencyset(self, session_id: str, board_n: int, freq: int) -> dict:
"""Set frequency. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board number to set frequency on.
freq: The frequency to set.
Returns:
A confirmation of setting frequency values.
</details>
"""
return await self.send_command(
"frequencyset", parameters=f"{session_id},{board_n},{freq}"
)
async def frequencystop(self, session_id: str, board_n: int) -> dict:
"""Stop set frequency. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board number to set frequency on.
Returns:
A confirmation of stopping frequencyset value.
</details>
"""
return await self.send_command(
"frequencystop", parameters=f"{session_id},{board_n}"
)
async def groupquota(self, group_n: int, quota: int) -> dict:
"""Set a group's quota.
<details>
<summary>Expand</summary>
Parameters:
group_n: The group number to set quota on.
quota: The quota to use.
Returns:
A confirmation of setting quota value.
</details>
"""
return await self.send_command("groupquota", parameters=f"{group_n},{quota}")
async def groups(self) -> dict:
"""Get pool group data.
<details>
<summary>Expand</summary>
Returns:
Data on the pool groups on the miner.
</details>
"""
return await self.send_command("groups")
async def healthchipget(self, board_n: int, chip_n: int = None) -> dict:
"""Get chip health.
<details>
<summary>Expand</summary>
Parameters:
board_n: The board number to get chip health of.
chip_n: The chip number to get chip health of. Optional.
Returns:
Chip health data.
</details>
"""
healthchipget_data = [str(board_n)]
if chip_n is not None:
healthchipget_data.append(str(chip_n))
return await self.send_command(
"healthchipget", parameters=",".join(healthchipget_data)
)
async def healthchipset(
self, session_id: str, board_n: int, chip_n: int = None
) -> dict:
"""Select the next chip to have its health checked. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board number to next get chip health of.
chip_n: The chip number to next get chip health of. Optional.
Returns:
Confirmation of selecting the next health check chip.
</details>
"""
healthchipset_data = [session_id, str(board_n)]
if chip_n is not None:
healthchipset_data.append(str(chip_n))
return await self.send_command(
"healthchipset", parameters=",".join(healthchipset_data)
)
async def healthctrl(self) -> dict:
"""Get health check config.
<details>
<summary>Expand</summary>
Returns:
Health check config.
</details>
"""
return await self.send_command("healthctrl")
async def healthctrlset(
self, session_id: str, num_readings: int, amplified_factor: float
) -> dict:
"""Set health control config. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
num_readings: The minimum number of readings for evaluation.
amplified_factor: Performance factor of the evaluation.
Returns:
A confirmation of setting health control config.
</details>
"""
return await self.send_command(
"healthctrlset",
parameters=f"{session_id},{num_readings},{amplified_factor}",
)
async def kill(self) -> dict:
"""Forced session kill. Use logoff instead.
<details>
<summary>Expand</summary>
Returns:
A confirmation of killing the active session.
</details>
"""
return await self.send_command("kill")
async def lcd(self) -> dict:
"""Get a general all-in-one status summary of the miner. Always zeros on LUXMiner.
<details>
<summary>Expand</summary>
Returns:
An all-in-one status summary of the miner.
</details>
"""
return await self.send_command("lcd")
async def ledset(
self,
session_id: str,
color: Literal["red"],
state: Literal["on", "off", "blink"],
) -> dict:
"""Set led. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
color: The color LED to set. Can be "red".
state: The state to set the LED to. Can be "on", "off", or "blink".
Returns:
A confirmation of setting LED.
</details>
"""
return await self.send_command(
"ledset", parameters=f"{session_id},{color},{state}"
)
async def limits(self) -> dict:
"""Get max and min values of config parameters.
<details>
<summary>Expand</summary>
Returns:
Data on max and min values of config parameters.
</details>
"""
return await self.send_command("limits")
async def logoff(self, session_id: str) -> dict:
"""Log off of a session. Requires a session id from an active session.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
Confirmation of logging off a session.
</details>
"""
return await self.send_command("logoff", parameters=session_id)
async def logon(self) -> dict:
"""Get or create a session.
<details>
<summary>Expand</summary>
Returns:
The Session ID to be used.
</details>
"""
return await self.send_command("logon")
async def pools(self) -> dict:
"""Get pool information.
<details>
<summary>Expand</summary>
Returns:
Miner pool information.
</details>
"""
return await self.send_command("pools")
async def power(self) -> dict:
"""Get the estimated power usage in watts.
<details>
<summary>Expand</summary>
Returns:
Estimated power usage in watts.
</details>
"""
return await self.send_command("power")
async def profiles(self) -> dict:
"""Get the available profiles.
<details>
<summary>Expand</summary>
Returns:
Data on available profiles.
</details>
"""
return await self.send_command("profiles")
async def profileset(self, session_id: str, board_n: int, profile: str) -> dict:
"""Set active profile for a board. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board to set the profile on.
profile: The profile name to use.
Returns:
A confirmation of setting the profile on board_n.
</details>
"""
return await self.send_command(
"profileset", parameters=f"{session_id},{board_n},{profile}"
)
async def reboot(self, session_id: str, board_n: int, delay_s: int = None) -> dict:
"""Reboot a board. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board to reboot.
delay_s: The number of seconds to delay until startup. If it is 0, the board will just stop. Optional.
Returns:
A confirmation of rebooting board_n.
</details>
"""
reboot_data = [session_id, str(board_n)]
if delay_s is not None:
reboot_data.append(str(delay_s))
return await self.send_command("reboot", parameters=",".join(reboot_data))
async def rebootdevice(self, session_id: str) -> dict:
"""Reboot the miner. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
A confirmation of rebooting the miner.
</details>
"""
return await self.send_command("rebootdevice", parameters=session_id)
async def removegroup(self, group_id: str) -> dict:
"""Remove a pool group.
<details>
<summary>Expand</summary>
Parameters:
group_id: Group id to remove.
Returns:
A confirmation of removing the pool group.
</details>
"""
return await self.send_command("removegroup", parameters=group_id)
async def resetminer(self, session_id: str) -> dict:
"""Restart the mining process. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
A confirmation of restarting the mining process.
</details>
"""
return await self.send_command("resetminer", parameters=session_id)
async def removepool(self, pool_id: int) -> dict:
"""Remove a pool.
<details>
<summary>Expand</summary>
Parameters:
pool_id: Pool to remove.
Returns:
A confirmation of removing the pool.
</details>
"""
return await self.send_command("removepool", parameters=str(pool_id))
async def session(self) -> dict:
"""Get the current session.
<details>
<summary>Expand</summary>
Returns:
Data on the current session.
</details>
"""
return await self.send_command("session")
async def tempctrlset(self, target: int, hot: int, dangerous: int) -> dict:
"""Set temp control values.
<details>
<summary>Expand</summary>
Parameters:
target: Target temp.
hot: Hot temp.
dangerous: Dangerous temp.
Returns:
A confirmation of setting the temp control config.
</details>
"""
return await self.send_command(
"tempctrlset", parameters=f"{target},{hot},{dangerous}"
)
async def stats(self) -> dict:
"""Get stats of each device/pool with more than 1 getwork.
<details>
<summary>Expand</summary>
Returns:
Stats of each device/pool with more than 1 getwork.
</details>
"""
return await self.send_command("stats")
async def summary(self) -> dict:
"""Get the status summary of the miner.
<details>
<summary>Expand</summary>
Returns:
The status summary of the miner.
</details>
"""
return await self.send_command("summary")
async def switchpool(self, pool_id: int) -> dict:
"""Switch to a pool.
<details>
<summary>Expand</summary>
Parameters:
pool_id: Pool to switch to.
Returns:
A confirmation of switching to the pool.
</details>
"""
return await self.send_command("switchpool", parameters=str(pool_id))
async def tempctrl(self) -> dict:
"""Get temperature control data.
<details>
<summary>Expand</summary>
Returns:
Data about the temp control settings of the miner.
</details>
"""
return await self.send_command("tempctrl")
async def temps(self) -> dict:
"""Get temperature data.
<details>
<summary>Expand</summary>
Returns:
Data on the temps of the miner.
</details>
"""
return await self.send_command("temps")
async def version(self) -> dict:
"""Get miner version info.
<details>
<summary>Expand</summary>
Returns:
Miner version information.
</details>
"""
return await self.send_command("version")
async def voltageget(self, board_n: int) -> dict:
"""Get voltage data for a board.
<details>
<summary>Expand</summary>
Parameters:
board_n: The board number to get voltage info from.
Returns:
Board voltage values.
</details>
"""
return await self.send_command("frequencyget", parameters=str(board_n))
async def voltageset(self, session_id: str, board_n: int, voltage: float) -> dict:
"""Set voltage values.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board to set the voltage on.
voltage: The voltage to use.
Returns:
A confirmation of setting the voltage.
</details>
"""
return await self.send_command(
"voltageset", parameters=f"{session_id},{board_n},{voltage}"
)
async def wakeup(self, session_id: str) -> dict:
"""Take the miner out of sleep mode. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
A confirmation of resuming mining.
</details>
"""
return await self.send_command("wakeup", parameters=session_id)

97
pyasic/rpc/unknown.py Normal file
View File

@@ -0,0 +1,97 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.rpc import BaseMinerRPCAPI
class UnknownRPCAPI(BaseMinerRPCAPI):
"""An abstraction of an API for a miner which is unknown.
This class is designed to try to be an intersection of as many miner APIs
and API commands as possible (API ⋂ API), to ensure that it can be used
with as many APIs as possible.
"""
def __init__(self, ip, api_ver: str = "0.0.0", port: int = 4028):
super().__init__(ip, port)
self.api_ver = api_ver
async def asccount(self) -> dict:
return await self.send_command("asccount")
async def asc(self, n: int) -> dict:
return await self.send_command("asc", parameters=n)
async def devdetails(self) -> dict:
return await self.send_command("devdetails")
async def devs(self) -> dict:
return await self.send_command("devs")
async def edevs(self, old: bool = False) -> dict:
if old:
return await self.send_command("edevs", parameters="old")
else:
return await self.send_command("edevs")
async def pools(self) -> dict:
return await self.send_command("pools")
async def summary(self) -> dict:
return await self.send_command("summary")
async def stats(self) -> dict:
return await self.send_command("stats")
async def version(self) -> dict:
return await self.send_command("version")
async def estats(self) -> dict:
return await self.send_command("estats")
async def check(self) -> dict:
return await self.send_command("check")
async def coin(self) -> dict:
return await self.send_command("coin")
async def lcd(self) -> dict:
return await self.send_command("lcd")
async def switchpool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("switchpool", parameters=n)
async def enablepool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("enablepool", parameters=n)
async def disablepool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("disablepool", parameters=n)
async def addpool(self, url: str, username: str, password: str) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("addpool", parameters=f"{url},{username},{password}")
async def removepool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("removepool", parameters=n)