* changed over to package format and removed tools, added poetry

* reformat into miner_interface project

* add dist to .gitignore

* update readme and finish reformatting

* Added couple missing imports. (#13)

* change name to pyasic

Co-authored-by: upstreamdata <brett@upstreamdata.ca>
Co-authored-by: Mika Impola <mika@impola.fi>
This commit is contained in:
UpstreamData
2022-07-07 07:57:34 -06:00
committed by GitHub
parent 5261b00aad
commit dcc3e07998
267 changed files with 851 additions and 6693 deletions

214
pyasic/API/__init__.py Normal file
View File

@@ -0,0 +1,214 @@
import asyncio
import json
import ipaddress
import warnings
import logging
class APIError(Exception):
def __init__(self, *args):
if args:
self.message = args[0]
else:
self.message = None
def __str__(self):
if self.message:
return f"{self.message}"
else:
return "Incorrect API parameters."
class APIWarning(Warning):
def __init__(self, *args):
if args:
self.message = args[0]
else:
self.message = None
def __str__(self):
if self.message:
return f"{self.message}"
else:
return "Incorrect API parameters."
class BaseMinerAPI:
def __init__(self, ip: str, port: int = 4028) -> None:
# api port, should be 4028
self.port = port
# ip address of the miner
self.ip = ipaddress.ip_address(ip)
def get_commands(self) -> list:
"""Get a list of command accessible to a specific type of API on the miner."""
return [
func
for func in
# each function in self
dir(self)
if callable(getattr(self, func)) and
# no __ methods
not func.startswith("__") and
# remove all functions that are in this base class
func
not in [
func
for func in dir(BaseMinerAPI)
if callable(getattr(BaseMinerAPI, func))
]
]
async def multicommand(
self, *commands: str, ignore_x19_error: bool = False
) -> dict:
"""Creates and sends multiple commands as one command to the miner."""
logging.debug(f"{self.ip}: Sending multicommand: {[*commands]}")
# split the commands into a proper list
user_commands = [*commands]
allowed_commands = self.get_commands()
# make sure we can actually run the command, otherwise it will fail
commands = [command for command in user_commands if command in allowed_commands]
for item in list(set(user_commands) - set(commands)):
warnings.warn(
f"""Removing incorrect command: {item}
If you are sure you want to use this command please use API.send_command("{item}", ignore_errors=True) instead.""",
APIWarning,
)
# standard multicommand format is "command1+command2"
# doesnt work for S19 which is dealt with in the send command function
command = "+".join(commands)
data = None
try:
data = await self.send_command(command, x19_command=ignore_x19_error)
except APIError:
try:
data = {}
# S19 handler, try again
for cmd in command.split("+"):
data[cmd] = []
data[cmd].append(await self.send_command(cmd))
except APIError as e:
raise APIError(e)
except Exception as e:
logging.warning(f"{self.ip}: API Multicommand Error: {e}")
if data:
logging.debug(f"{self.ip}: Received multicommand data.")
return data
async def send_command(
self,
command: str or bytes,
parameters: str or int or bool = None,
ignore_errors: bool = False,
x19_command: bool = False,
) -> dict:
"""Send an API command to the miner and return the result."""
try:
# get reader and writer streams
reader, writer = await asyncio.open_connection(str(self.ip), self.port)
# handle OSError 121
except OSError as e:
if e.winerror == "121":
logging.warning("Semaphore Timeout has Expired.")
return {}
# create the command
cmd = {"command": command}
if parameters is not None:
cmd["parameter"] = parameters
# send the command
writer.write(json.dumps(cmd).encode("utf-8"))
await writer.drain()
# instantiate data
data = b""
# loop to receive all the data
try:
while True:
d = await reader.read(4096)
if not d:
break
data += d
except Exception as e:
logging.warning(f"{self.ip}: API Command Error: {e}")
data = self.load_api_data(data)
# close the connection
writer.close()
await writer.wait_closed()
# check for if the user wants to allow errors to return
if not ignore_errors:
# validate the command succeeded
validation = self.validate_command_output(data)
if not validation[0]:
if not x19_command:
logging.warning(f"{self.ip}: API Command Error: {validation[1]}")
raise APIError(validation[1])
return data
@staticmethod
def validate_command_output(data: dict) -> tuple:
"""Check if the returned command output is correctly formatted."""
# check if the data returned is correct or an error
# if status isn't a key, it is a multicommand
if "STATUS" not in data.keys():
for key in data.keys():
# make sure not to try to turn id into a dict
if not key == "id":
# make sure they succeeded
if "STATUS" in data[key][0].keys():
if data[key][0]["STATUS"][0]["STATUS"] not in ["S", "I"]:
# this is an error
return False, f"{key}: " + data[key][0]["STATUS"][0]["Msg"]
elif "id" not in data.keys():
if data["STATUS"] not in ["S", "I"]:
return False, data["Msg"]
else:
# make sure the command succeeded
if type(data["STATUS"]) == str:
if data["STATUS"] in ["RESTART"]:
return True, None
elif data["STATUS"][0]["STATUS"] not in ("S", "I"):
# this is an error
if data["STATUS"][0]["STATUS"] not in ("S", "I"):
return False, data["STATUS"][0]["Msg"]
return True, None
@staticmethod
def load_api_data(data: bytes) -> dict:
"""Convert API data from JSON to dict"""
str_data = None
try:
# some json from the API returns with a null byte (\x00) on the end
if data.endswith(b"\x00"):
# handle the null byte
str_data = data.decode("utf-8")[:-1]
else:
# no null byte
str_data = data.decode("utf-8")
# fix an error with a btminer return having an extra comma that breaks json.loads()
str_data = str_data.replace(",}", "}")
# fix an error with a btminer return having a newline that breaks json.loads()
str_data = str_data.replace("\n", "")
# fix an error with a bmminer return not having a specific comma that breaks json.loads()
str_data = str_data.replace("}{", "},{")
# fix an error with a bmminer return having a specific comma that breaks json.loads()
str_data = str_data.replace("[,{", "[{")
# fix an error with Avalonminers returning inf and nan
str_data = str_data.replace("inf", "0")
str_data = str_data.replace("nan", "0")
# fix whatever this garbage from avalonminers is `,"id":1}`
if str_data.startswith(","):
str_data = f"{{{str_data[1:]}"
# parse the json
parsed_data = json.loads(str_data)
# handle bad json
except json.decoder.JSONDecodeError as e:
raise APIError(f"Decode Error {e}: {str_data}")
return parsed_data

491
pyasic/API/bmminer.py Normal file
View File

@@ -0,0 +1,491 @@
from pyasic.API import BaseMinerAPI
class BMMinerAPI(BaseMinerAPI):
"""An abstraction of the BMMiner API.
Each method corresponds to an API command in BMMiner.
BMMiner API documentation:
https://github.com/jameshilliard/bmminer/blob/master/API-README
This class abstracts use of the BMMiner API, as well as the
methods for sending commands to it. The self.send_command()
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
:param ip: The IP of the miner to reference the API on.
:param port: The port to reference the API on. Default is 4028.
"""
def __init__(self, ip: str, port: int = 4028) -> None:
super().__init__(ip, port)
async def version(self) -> dict:
"""Get miner version info.
:return: Miner version information.
"""
return await self.send_command("version")
async def config(self) -> dict:
"""Get some basic configuration info.
:return: Some miner configuration information:
ASC Count <- the number of ASCs
PGA Count <- the number of PGAs
Pool Count <- the number of Pools
Strategy <- the current pool strategy
Log Interval <- the interval of logging
Device Code <- list of compiled device drivers
OS <- the current operating system
Failover-Only <- failover-only setting
Scan Time <- scan-time setting
Queue <- queue setting
Expiry <- expiry setting
"""
return await self.send_command("config")
async def summary(self) -> dict:
"""Get the status summary of the miner.
:return: The status summary of the miner.
"""
return await self.send_command("summary")
async def pools(self) -> dict:
"""Get pool information.
:return: Miner pool information.
"""
return await self.send_command("pools")
async def devs(self) -> dict:
"""Get data on each PGA/ASC with their details.
:return: Data on each PGA/ASC with their details.
"""
return await self.send_command("devs")
async def edevs(self, old: bool = False) -> dict:
"""Get data on each PGA/ASC with their details, ignoring
blacklisted and zombie devices.
:param old: Include zombie devices that became zombies less
than 'old' seconds ago
:return: Data on each PGA/ASC with their details.
"""
if old:
return await self.send_command("edevs", parameters=old)
else:
return await self.send_command("edevs")
async def pga(self, n: int) -> dict:
"""Get data from PGA n.
:param n: The PGA number to get data from.
:return: Data on the PGA n.
"""
return await self.send_command("pga", parameters=n)
async def pgacount(self) -> dict:
"""Get data fon all PGAs.
:return: Data on the PGAs connected.
"""
return await self.send_command("pgacount")
async def switchpool(self, n: int) -> dict:
"""Switch pools to pool n.
:param n: The pool to switch to.
:return: A confirmation of switching to pool n.
"""
return await self.send_command("switchpool", parameters=n)
async def enablepool(self, n: int) -> dict:
"""Enable pool n.
:param n: The pool to enable.
:return: A confirmation of enabling pool n.
"""
return await self.send_command("enablepool", parameters=n)
async def addpool(self, url: str, username: str, password: str) -> dict:
"""Add a pool to the miner.
:param url: The URL of the new pool to add.
:param username: The users username on the new pool.
:param password: The worker password on the new pool.
:return: A confirmation of adding the pool.
"""
return await self.send_command(
"addpool", parameters=f"{url},{username},{password}"
)
async def poolpriority(self, *n: int) -> dict:
"""Set pool priority.
:param n: Pools in order of priority.
:return: A confirmation of setting pool priority.
"""
pools = f"{','.join([str(item) for item in n])}"
return await self.send_command("poolpriority", parameters=pools)
async def poolquota(self, n: int, q: int) -> dict:
"""Set pool quota.
:param n: Pool number to set quota on.
:param q: Quota to set the pool to.
:return: A confirmation of setting pool quota.
"""
return await self.send_command("poolquota", parameters=f"{n},{q}")
async def disablepool(self, n: int) -> dict:
"""Disable a pool.
:param n: Pool to disable.
:return: A confirmation of diabling the pool.
"""
return await self.send_command("disablepool", parameters=n)
async def removepool(self, n: int) -> dict:
"""Remove a pool.
:param n: Pool to remove.
:return: A confirmation of removing the pool.
"""
return await self.send_command("removepool", parameters=n)
async def save(self, filename: str = None) -> dict:
"""Save the config.
:param filename: Filename to save the config as.
:return: A confirmation of saving the config.
"""
if filename:
return await self.send_command("save", parameters=filename)
else:
return await self.send_command("save")
async def quit(self) -> dict:
"""Quit BMMiner.
:return: A single "BYE" before BMMiner quits.
"""
return await self.send_command("quit")
async def notify(self) -> dict:
"""Notify the user of past errors.
:return: The last status and count of each devices problem(s).
"""
return await self.send_command("notify")
async def privileged(self) -> dict:
"""Check if you have privileged access.
:return: The STATUS section with an error if you have no
privileged access, or success if you have privileged access.
"""
return await self.send_command("privileged")
async def pgaenable(self, n: int) -> dict:
"""Enable PGA n.
:param n: The PGA to enable.
:return: A confirmation of enabling PGA n.
"""
return await self.send_command("pgaenable", parameters=n)
async def pgadisable(self, n: int) -> dict:
"""Disable PGA n.
:param n: The PGA to disable.
:return: A confirmation of disabling PGA n.
"""
return await self.send_command("pgadisable", parameters=n)
async def pgaidentify(self, n: int) -> dict:
"""Identify PGA n.
:param n: The PGA to identify.
:return: A confirmation of identifying PGA n.
"""
return await self.send_command("pgaidentify", parameters=n)
async def devdetails(self) -> dict:
"""Get data on all devices with their static details.
:return: Data on all devices with their static details.
"""
return await self.send_command("devdetails")
async def restart(self) -> dict:
"""Restart BMMiner using the API.
:return: A reply informing of the restart.
"""
return await self.send_command("restart")
async def stats(self) -> dict:
"""Get stats of each device/pool with more than 1 getwork.
:return: Stats of each device/pool with more than 1 getwork.
"""
return await self.send_command("stats")
async def estats(self, old: bool = False) -> dict:
"""Get stats of each device/pool with more than 1 getwork,
ignoring zombie devices.
:param old: Include zombie devices that became zombies less
than 'old' seconds ago.
:return: Stats of each device/pool with more than 1 getwork,
ignoring zombie devices.
"""
if old:
return await self.send_command("estats", parameters=old)
else:
return await self.send_command("estats")
async def check(self, command: str) -> dict:
"""Check if the command command exists in BMMiner.
:param command: The command to check.
:return: Information about a command:
Exists (Y/N) <- the command exists in this version
Access (Y/N) <- you have access to use the command
"""
return await self.send_command("check", parameters=command)
async def failover_only(self, failover: bool) -> dict:
"""Set failover-only.
:param failover: What to set failover-only to.
:return: Confirmation of setting failover-only.
"""
return await self.send_command("failover-only", parameters=failover)
async def coin(self) -> dict:
"""Get information on the current coin.
:return: Information about the current coin being mined:
Hash Method <- the hashing algorithm
Current Block Time <- blocktime as a float, 0 means none
Current Block Hash <- the hash of the current block, blank
means none
LP <- whether LP is in use on at least 1 pool
Network Difficulty: the current network difficulty
"""
return await self.send_command("coin")
async def debug(self, setting: str) -> dict:
"""Set a debug setting.
:param setting: Which setting to switch to. Options are:
Silent,
Quiet,
Verbose,
Debug,
RPCProto,
PerDevice,
WorkTime,
Normal.
:return: Data on which debug setting was enabled or disabled.
"""
return await self.send_command("debug", parameters=setting)
async def setconfig(self, name: str, n: int) -> dict:
"""Set config of name to value n.
:param name: The name of the config setting to set. Options are:
queue,
scantime,
expiry.
:param n: The value to set the 'name' setting to.
:return: The results of setting config of name to n.
"""
return await self.send_command("setconfig", parameters=f"{name},{n}")
async def usbstats(self) -> dict:
"""Get stats of all USB devices except ztex.
:return: The stats of all USB devices except ztex.
"""
return await self.send_command("usbstats")
async def pgaset(self, n: int, opt: str, val: int = None) -> dict:
"""Set PGA option opt to val on PGA n.
Options:
MMQ -
opt: clock
val: 160 - 230 (multiple of 2)
CMR -
opt: clock
val: 100 - 220
:param n: The PGA to set the options on.
:param opt: The option to set. Setting this to 'help'
returns a help message.
:param val: The value to set the option to.
:return: Confirmation of setting PGA n with opt[,val].
"""
if val:
return await self.send_command("pgaset", parameters=f"{n},{opt},{val}")
else:
return await self.send_command("pgaset", parameters=f"{n},{opt}")
async def zero(self, which: str, summary: bool) -> dict:
"""Zero a device.
:param which: Which device to zero.
Setting this to 'all' zeros all devices.
Setting this to 'bestshare' zeros only the bestshare values
for each pool and global.
:param summary: Whether or not to show a full summary.
:return: the STATUS section with info on the zero and optional
summary.
"""
return await self.send_command("zero", parameters=f"{which},{summary}")
async def hotplug(self, n: int) -> dict:
"""Enable hotplug.
:param n: The device number to set hotplug on.
:return: Information on hotplug status.
"""
return await self.send_command("hotplug", parameters=n)
async def asc(self, n: int) -> dict:
"""Get data for ASC device n.
:param n: The device to get data for.
:return: The data for ASC device n.
"""
return await self.send_command("asc", parameters=n)
async def ascenable(self, n: int) -> dict:
"""Enable ASC device n.
:param n: The device to enable.
:return: Confirmation of enabling ASC device n.
"""
return await self.send_command("ascenable", parameters=n)
async def ascdisable(self, n: int) -> dict:
"""Disable ASC device n.
:param n: The device to disable.
:return: Confirmation of disabling ASC device n.
"""
return await self.send_command("ascdisable", parameters=n)
async def ascidentify(self, n: int) -> dict:
"""Identify ASC device n.
:param n: The device to identify.
:return: Confirmation of identifying ASC device n.
"""
return await self.send_command("ascidentify", parameters=n)
async def asccount(self) -> dict:
"""Get data on the number of ASC devices and their info.
:return: Data on all ASC devices.
"""
return await self.send_command("asccount")
async def ascset(self, n: int, opt: str, val: int = None) -> dict:
"""Set ASC n option opt to value val.
Sets an option on the ASC n to a value. Allowed options are:
AVA+BTB -
opt: freq
val: 256 - 1024 (chip frequency)
BTB -
opt: millivolts
val: 1000 - 1400 (core voltage)
MBA -
opt: reset
val: 0 - # of chips (reset a chip)
opt: freq
val: 0 - # of chips, 100 - 1400 (chip frequency)
opt: ledcount
val: 0 - 100 (chip count for LED)
opt: ledlimit
val: 0 - 200 (LED off below GH/s)
opt: spidelay
val: 0 - 9999 (SPI per I/O delay)
opt: spireset
val: i or s, 0 - 9999 (SPI regular reset)
opt: spisleep
val: 0 - 9999 (SPI reset sleep in ms)
BMA -
opt: volt
val: 0 - 9
opt: clock
val: 0 - 15
:param n: The ASC to set the options on.
:param opt: The option to set. Setting this to 'help' returns a
help message.
:param val: The value to set the option to.
:return: Confirmation of setting option opt to value val.
"""
if val:
return await self.send_command("ascset", parameters=f"{n},{opt},{val}")
else:
return await self.send_command("ascset", parameters=f"{n},{opt}")
async def lcd(self) -> dict:
"""Get a general all-in-one status summary of the miner.
:return: An all-in-one status summary of the miner.
"""
return await self.send_command("lcd")
async def lockstats(self) -> dict:
"""Write lockstats to STDERR.
:return: The result of writing the lock stats to STDERR.
"""
return await self.send_command("lockstats")

208
pyasic/API/bosminer.py Normal file
View File

@@ -0,0 +1,208 @@
from pyasic.API import BaseMinerAPI
class BOSMinerAPI(BaseMinerAPI):
"""An abstraction of the BOSMiner API.
Each method corresponds to an API command in BOSMiner.
BOSMiner API documentation:
https://docs.braiins.com/os/plus-en/Development/1_api.html
This class abstracts use of the BOSMiner API, as well as the
methods for sending commands to it. The self.send_command()
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
:param ip: The IP of the miner to reference the API on.
:param port: The port to reference the API on. Default is 4028.
"""
def __init__(self, ip, port=4028):
super().__init__(ip, port)
async def asccount(self) -> dict:
"""Get data on the number of ASC devices and their info.
:return: Data on all ASC devices.
"""
return await self.send_command("asccount")
async def asc(self, n: int) -> dict:
"""Get data for ASC device n.
:param n: The device to get data for.
:return: The data for ASC device n.
"""
return await self.send_command("asc", parameters=n)
async def devdetails(self) -> dict:
"""Get data on all devices with their static details.
:return: Data on all devices with their static details.
"""
return await self.send_command("devdetails")
async def devs(self) -> dict:
"""Get data on each PGA/ASC with their details.
:return: Data on each PGA/ASC with their details.
"""
return await self.send_command("devs")
async def edevs(self, old: bool = False) -> dict:
"""Get data on each PGA/ASC with their details, ignoring
blacklisted and zombie devices.
:param old: Include zombie devices that became zombies less
than 'old' seconds ago
:return: Data on each PGA/ASC with their details.
"""
if old:
return await self.send_command("edevs", parameters="old")
else:
return await self.send_command("edevs")
async def pools(self) -> dict:
"""Get pool information.
:return: Miner pool information.
"""
return await self.send_command("pools")
async def summary(self) -> dict:
"""Get the status summary of the miner.
:return: The status summary of the miner.
"""
return await self.send_command("summary")
async def stats(self) -> dict:
"""Get stats of each device/pool with more than 1 getwork.
:return: Stats of each device/pool with more than 1 getwork.
"""
return await self.send_command("stats")
async def version(self) -> dict:
"""Get miner version info.
:return: Miner version information.
"""
return await self.send_command("version")
async def estats(self, old: bool = False) -> dict:
"""Get stats of each device/pool with more than 1 getwork,
ignoring zombie devices.
:param old: Include zombie devices that became zombies less
than 'old' seconds ago.
:return: Stats of each device/pool with more than 1 getwork,
ignoring zombie devices.
"""
if old:
return await self.send_command("estats", parameters=old)
else:
return await self.send_command("estats")
async def check(self, command: str) -> dict:
"""Check if the command command exists in BOSMiner.
:param command: The command to check.
:return: Information about a command:
Exists (Y/N) <- the command exists in this version
Access (Y/N) <- you have access to use the command
"""
return await self.send_command("check", parameters=command)
async def coin(self) -> dict:
"""Get information on the current coin.
:return: Information about the current coin being mined:
Hash Method <- the hashing algorithm
Current Block Time <- blocktime as a float, 0 means none
Current Block Hash <- the hash of the current block, blank
means none
LP <- whether LP is in use on at least 1 pool
Network Difficulty: the current network difficulty
"""
return await self.send_command("coin")
async def lcd(self) -> dict:
"""Get a general all-in-one status summary of the miner.
:return: An all-in-one status summary of the miner.
"""
return await self.send_command("lcd")
async def switchpool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("switchpool", parameters=n)
async def enablepool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("enablepool", parameters=n)
async def disablepool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("disablepool", parameters=n)
async def addpool(self, url: str, username: str, password: str) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("addpool", parameters=f"{url},{username},{password}")
async def removepool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("removepool", parameters=n)
async def fans(self) -> dict:
"""Get fan data.
:return: Data on the fans of the miner.
"""
return await self.send_command("fans")
async def tempctrl(self) -> dict:
"""Get temperature control data.
:return: Data about the temp control settings of the miner.
"""
return await self.send_command("tempctrl")
async def temps(self) -> dict:
"""Get temperature data.
:return: Data on the temps of the miner.
"""
return await self.send_command("temps")
async def tunerstatus(self) -> dict:
"""Get tuner status data
:return: Data on the status of autotuning.
"""
return await self.send_command("tunerstatus")
async def pause(self) -> dict:
"""Pause mining.
:return: Confirmation of pausing mining.
"""
return await self.send_command("pause")
async def resume(self) -> dict:
"""Resume mining.
:return: Confirmation of resuming mining.
"""
return await self.send_command("resume")

705
pyasic/API/btminer.py Normal file
View File

@@ -0,0 +1,705 @@
import asyncio
import re
import json
import hashlib
import binascii
import base64
import logging
from passlib.handlers.md5_crypt import md5_crypt
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from pyasic.API import BaseMinerAPI, APIError
from pyasic.settings import WHATSMINER_PWD
### IMPORTANT ###
# you need to change the password of the miners using the Whatsminer
# tool, then you can set them back to admin with this tool, but they
# must be changed to something else and set back to admin with this
# or the privileged API will not work using admin as the password. If
# you change the password, you can pass that to the this class as pwd,
# or add it as the Whatsminer_pwd in the settings.toml file.
def _crypt(word: str, salt: str) -> str:
"""Encrypts a word with a salt, using a standard salt format.
Encrypts a word using a salt with the format
'\s*\$(\d+)\$([\w\./]*)\$'. If this format is not used, a
ValueError is raised.
:param word: The word to be encrypted.
:param salt: The salt to encrypt the word.
:return: An MD5 hash of the word with the salt.
"""
# compile a standard format for the salt
standard_salt = re.compile("\s*\$(\d+)\$([\w\./]*)\$")
# check if the salt matches
match = standard_salt.match(salt)
# if the matching fails, the salt is incorrect
if not match:
raise ValueError("Salt format is not correct.")
# save the matched salt in a new variable
new_salt = match.group(2)
# encrypt the word with the salt using md5
result = md5_crypt.hash(word, salt=new_salt)
return result
def _add_to_16(string: str) -> bytes:
"""Add null bytes to a string until the length is a multiple 16
:param string: The string to lengthen to a multiple of 16 and
encode.
:return: The input string as bytes with a multiple of 16 as the
length.
"""
while len(string) % 16 != 0:
string += "\0"
return str.encode(string) # return bytes
def parse_btminer_priviledge_data(token_data: dict, data: dict):
"""Parses data returned from the BTMiner privileged API.
Parses data from the BTMiner privileged API using the the token
from the API in an AES format.
:param token_data: The token information from self.get_token().
:param data: The data to parse, returned from the API.
:return: A decoded dict version of the privileged command output.
"""
# get the encoded data from the dict
enc_data = data["enc"]
# get the aes key from the token data
aeskey = hashlib.sha256(token_data["host_passwd_md5"].encode()).hexdigest()
# unhexlify the aes key
aeskey = binascii.unhexlify(aeskey.encode())
# create the required decryptor
aes = Cipher(algorithms.AES(aeskey), modes.ECB())
decryptor = aes.decryptor()
# decode the message with the decryptor
ret_msg = json.loads(
decryptor.update(base64.decodebytes(bytes(enc_data, encoding="utf8")))
.rstrip(b"\0")
.decode("utf8")
)
return ret_msg
def create_privileged_cmd(token_data: dict, command: dict) -> bytes:
"""Create a privileged command to send to the BTMiner API.
Creates a privileged command using the token from the API and the
command as a dict of {'command': cmd}, with cmd being any command
that the miner API accepts.
:param token_data: The token information from self.get_token().
:param command: The command to turn into a privileged command.
:return: The encrypted privileged command to be sent to the miner.
"""
# add token to command
command["token"] = token_data["host_sign"]
# encode host_passwd data and get hexdigest
aeskey = hashlib.sha256(token_data["host_passwd_md5"].encode()).hexdigest()
# unhexlify the encoded host_passwd
aeskey = binascii.unhexlify(aeskey.encode())
# create a new AES key
aes = Cipher(algorithms.AES(aeskey), modes.ECB())
encryptor = aes.encryptor()
# dump the command to json
api_json_str = json.dumps(command)
# encode the json command with the aes key
api_json_str_enc = (
base64.encodebytes(encryptor.update(_add_to_16(api_json_str)))
.decode("utf-8")
.replace("\n", "")
)
# label the data as being encoded
data_enc = {"enc": 1, "data": api_json_str_enc}
# dump the labeled data to json
api_packet_str = json.dumps(data_enc)
return api_packet_str.encode("utf-8")
class BTMinerAPI(BaseMinerAPI):
"""An abstraction of the API for MicroBT Whatsminers, BTMiner.
Each method corresponds to an API command in BMMiner.
This class abstracts use of the BTMiner API, as well as the
methods for sending commands to it. The self.send_command()
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
All privileged commands for BTMiner's API require that you change
the password of the miners using the Whatsminer tool, and it can be
changed back to admin with this tool after. Set the new password
either by passing it to the __init__ method, or changing it in
settings.toml.
Additionally, the API commands for the privileged API must be
encoded using a token from the miner, all privileged commands do
this automatically for you and will decode the output to look like
a normal output from a miner API.
:param ip: The IP of the miner to reference the API on.
:param port: The port to reference the API on. Default is 4028.
:param pwd: The admin password of the miner. Default is admin.
"""
def __init__(self, ip, port=4028, pwd: str = WHATSMINER_PWD):
super().__init__(ip, port)
self.admin_pwd = pwd
self.current_token = None
async def send_command(
self,
command: str or bytes,
parameters: str or int or bool = None,
ignore_errors: bool = False,
**kwargs,
) -> dict:
"""Send a command to the miner API.
Send a command using an asynchronous connection, load the data,
parse encoded data if needed, and return the result.
:param command: The command to send to the miner.
:param parameters: Parameters to pass to the command.
:param ignore_errors: Ignore the E (Error) status code from the
API.
:return: The data received from the API after sending the
command.
"""
# check if command is a string
# if its bytes its encoded and needs to be sent raw
if isinstance(command, str):
# if it is a string, put it into the standard command format
command = json.dumps({"command": command}).encode("utf-8")
try:
# get reader and writer streams
reader, writer = await asyncio.open_connection(str(self.ip), self.port)
# handle OSError 121
except OSError as e:
if e.winerror == "121":
print("Semaphore Timeout has Expired.")
return {}
# send the command
writer.write(command)
await writer.drain()
# instantiate data
data = b""
# loop to receive all the data
try:
while True:
d = await reader.read(4096)
if not d:
break
data += d
except Exception as e:
logging.info(f"{str(self.ip)}: {e}")
data = self.load_api_data(data)
# close the connection
writer.close()
await writer.wait_closed()
# check if the returned data is encoded
if "enc" in data.keys():
# try to parse the encoded data
try:
data = parse_btminer_priviledge_data(self.current_token, data)
except Exception as e:
logging.info(f"{str(self.ip)}: {e}")
if not ignore_errors:
# if it fails to validate, it is likely an error
validation = self.validate_command_output(data)
if not validation[0]:
raise APIError(validation[1])
# return the parsed json as a dict
return data
async def get_token(self):
"""Gets token information from the API.
:return: An encoded token and md5 password, which are used
for the privileged API.
"""
# get the token
data = await self.send_command("get_token")
# encrypt the admin password with the salt
pwd = _crypt(self.admin_pwd, "$1$" + data["Msg"]["salt"] + "$")
pwd = pwd.split("$")
# take the 4th item from the pwd split
host_passwd_md5 = pwd[3]
# encrypt the pwd with the time and new salt
tmp = _crypt(pwd[3] + data["Msg"]["time"], "$1$" + data["Msg"]["newsalt"] + "$")
tmp = tmp.split("$")
# take the 4th item from the encrypted pwd split
host_sign = tmp[3]
# set the current token
self.current_token = {
"host_sign": host_sign,
"host_passwd_md5": host_passwd_md5,
}
return self.current_token
#### PRIVILEGED COMMANDS ####
# Please read the top of this file to learn
# how to configure the Whatsminer API to
# use these commands.
async def update_pools(
self,
pool_1: str,
worker_1: str,
passwd_1: str,
pool_2: str = None,
worker_2: str = None,
passwd_2: str = None,
pool_3: str = None,
worker_3: str = None,
passwd_3: str = None,
):
"""Update the pools of the miner using the API.
Update the pools of the miner using the API, only works after
changing the password of the miner using the Whatsminer tool.
:param pool_1: The URL to update pool 1 to.
:param worker_1: The worker name for pool 1 to update to.
:param passwd_1: The password for pool 1 to update to.
:param pool_2: The URL to update pool 2 to.
:param worker_2: The worker name for pool 2 to update to.
:param passwd_2: The password for pool 2 to update to.
:param pool_3: The URL to update pool 3 to.
:param worker_3: The worker name for pool 3 to update to.
:param passwd_3: The password for pool 3 to update to.
:return: A dict from the API to confirm the pools were updated.
"""
# get the token and password from the miner
token_data = await self.get_token()
# parse pool data
if not pool_1:
raise APIError("No pools set.")
elif pool_2 and pool_3:
command = {
"cmd": "update_pools",
"pool1": pool_1,
"worker1": worker_1,
"passwd1": passwd_1,
"pool2": pool_2,
"worker2": worker_2,
"passwd2": passwd_2,
"pool3": pool_3,
"worker3": worker_3,
"passwd3": passwd_3,
}
elif pool_2:
command = {
"cmd": "update_pools",
"pool1": pool_1,
"worker1": worker_1,
"passwd1": passwd_1,
"pool2": pool_2,
"worker2": worker_2,
"passwd2": passwd_2,
}
else:
command = {
"cmd": "update_pools",
"pool1": pool_1,
"worker1": worker_1,
"passwd1": passwd_1,
}
# encode the command with the token data
enc_command = create_privileged_cmd(token_data, command)
# send the command
return await self.send_command(enc_command)
async def restart(self):
"""Restart BTMiner using the API.
Restart BTMiner using the API, only works after changing
the password of the miner using the Whatsminer tool.
:return: A reply informing of the restart.
"""
command = {"cmd": "restart_btminer"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def power_off(self, respbefore: bool = True):
"""Power off the miner using the API.
Power off the miner using the API, only works after changing
the password of the miner using the Whatsminer tool.
:param respbefore: Whether to respond before powering off.
:return: A reply informing of the status of powering off.
"""
if respbefore:
command = {"cmd": "power_off", "respbefore": "true"}
else:
command = {"cmd": "power_off", "respbefore": "false"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def power_on(self):
"""Power on the miner using the API.
Power on the miner using the API, only works after changing
the password of the miner using the Whatsminer tool.
:return: A reply informing of the status of powering on.
"""
command = {"cmd": "power_on"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def reset_led(self):
"""Reset the LED on the miner using the API.
Reset the LED on the miner using the API, only works after
changing the password of the miner using the Whatsminer tool.
:return: A reply informing of the status of resetting the LED.
"""
command = {"cmd": "set_led", "param": "auto"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def set_led(
self,
color: str = "red",
period: int = 2000,
duration: int = 1000,
start: int = 0,
):
"""Set the LED on the miner using the API.
Set the LED on the miner using the API, only works after
changing the password of the miner using the Whatsminer tool.
:param color: The LED color to set, either 'red' or 'green'.
:param period: The flash cycle in ms.
:param duration: LED on time in the cycle in ms.
:param start: LED on time offset in the cycle in ms.
:return: A reply informing of the status of setting the LED.
"""
command = {
"cmd": "set_led",
"color": color,
"period": period,
"duration": duration,
"start": start,
}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def set_low_power(self):
"""Set low power mode on the miner using the API.
Set low power mode on the miner using the API, only works after
changing the password of the miner using the Whatsminer tool.
:return: A reply informing of the status of setting low power
mode.
"""
command = {"cmd": "set_low_power"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def update_firmware(self): # noqa - static
# to be determined if this will be added later
# requires a file stream in bytes
return NotImplementedError
async def reboot(self):
"""Reboot the miner using the API.
:return: A reply informing of the status of the reboot.
"""
command = {"cmd": "reboot"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def factory_reset(self):
"""Reset the miner to factory defaults.
:return: A reply informing of the status of the reset.
"""
command = {"cmd": "factory_reset"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def update_pwd(self, old_pwd: str, new_pwd: str):
"""Update the admin user's password.
Update the admin user's password, only works after changing the
password of the miner using the Whatsminer tool. New password
has a max length of 8 bytes, using letters, numbers, and
underscores.
:param old_pwd: The old admin password.
:param new_pwd: The new password to set.
:return: A reply informing of the status of setting the
password.
"""
# check if password length is greater than 8 bytes
if len(new_pwd.encode("utf-8")) > 8:
return APIError(
f"New password too long, the max length is 8. "
f"Password size: {len(new_pwd.encode('utf-8'))}"
)
command = {"cmd": "update_pwd", "old": old_pwd, "new": new_pwd}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def set_target_freq(self, percent: int):
"""Update the target frequency.
Update the target frequency, only works after changing the
password of the miner using the Whatsminer tool. The new
frequency must be between -10% and 100%.
:param percent: The frequency % to set.
:return: A reply informing of the status of setting the
frequency.
"""
if not -10 < percent < 100:
return APIError(
f"Frequency % is outside of the allowed "
f"range. Please set a % between -10 and "
f"100"
)
command = {"cmd": "set_target_freq", "percent": str(percent)}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def enable_fast_boot(self):
"""Turn on fast boot.
Turn on fast boot, only works after changing the password of
the miner using the Whatsminer tool.
:return: A reply informing of the status of enabling fast boot.
"""
command = {"cmd": "enable_btminer_fast_boot"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def disable_fast_boot(self):
"""Turn off fast boot.
Turn off fast boot, only works after changing the password of
the miner using the Whatsminer tool.
:return: A reply informing of the status of disabling fast boot.
"""
command = {"cmd": "disable_btminer_fast_boot"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def enable_web_pools(self):
"""Turn on web pool updates.
Turn on web pool updates, only works after changing the
password of the miner using the Whatsminer tool.
:return: A reply informing of the status of enabling web pools.
"""
command = {"cmd": "enable_web_pools"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def disable_web_pools(self):
"""Turn off web pool updates.
Turn off web pool updates, only works after changing the
password of the miner using the Whatsminer tool.
:return: A reply informing of the status of disabling web
pools.
"""
command = {"cmd": "disable_web_pools"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def set_hostname(self, hostname: str):
"""Set the hostname of the miner.
Set the hostname of the miner, only works after changing the
password of the miner using the Whatsminer tool.
:param hostname: The new hostname to use.
:return: A reply informing of the status of setting the
hostname.
"""
command = {"cmd": "set_hostname", "hostname": hostname}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def set_power_pct(self, percent: int):
"""Set the power percentage of the miner.
Set the power percentage of the miner, only works after changing
the password of the miner using the Whatsminer tool.
:param percent: The power percentage to set.
:return: A reply informing of the status of setting the
power percentage.
"""
if not 0 < percent < 100:
return APIError(
f"Power PCT % is outside of the allowed "
f"range. Please set a % between 0 and "
f"100"
)
command = {"cmd": "set_power_pct", "percent": str(percent)}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def pre_power_on(self, complete: bool, msg: str):
"""Configure or check status of pre power on.
Configure or check status of pre power on, only works after
changing the password of the miner using the Whatsminer tool.
:param complete: check whether pre power on is complete.
:param msg: the message to check.
"wait for adjust temp" or
"adjust complete" or
"adjust continue"
:return: A reply informing of the status of pre power on.
"""
if not msg == "wait for adjust temp" or "adjust complete" or "adjust continue":
return APIError(
"Message is incorrect, please choose one of "
'["wait for adjust temp", '
'"adjust complete", '
'"adjust continue"]'
)
if complete:
complete = "true"
else:
complete = "false"
command = {"cmd": "pre_power_on", "complete": complete, "msg": msg}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
#### END privileged COMMANDS ####
async def summary(self):
"""Get the summary status from the miner.
:return: Summary status of the miner.
"""
return await self.send_command("summary")
async def pools(self):
"""Get the pool status from the miner.
:return: Pool status of the miner.
"""
return await self.send_command("pools")
async def devs(self):
"""Get data on each PGA/ASC with their details.
:return: Data on each PGA/ASC with their details.
"""
return await self.send_command("devs")
async def edevs(self):
"""Get data on each PGA/ASC with their details, ignoring
blacklisted and zombie devices.
:return: Data on each PGA/ASC with their details.
"""
return await self.send_command("edevs")
async def devdetails(self):
"""Get data on all devices with their static details.
:return: Data on all devices with their static details.
"""
return await self.send_command("devdetails")
async def get_psu(self):
"""Get data on the PSU and power information.
:return: Data on the PSU and power information.
"""
return await self.send_command("get_psu")
async def version(self):
"""Get version data for the miner.
Get version data for the miner. This calls another function,
self.get_version(), but is named version to stay consistent
with the other miner APIs.
:return: Version data for the miner.
"""
return await self.get_version()
async def get_version(self):
"""Get version data for the miner.
:return: Version data for the miner.
"""
return await self.send_command("get_version")
async def status(self):
"""Get BTMiner status and firmware version.
:return: BTMiner status and firmware version.
"""
return await self.send_command("status")
async def get_miner_info(self):
"""Get general miner info.
:return: General miner info.
"""
return await self.send_command("get_miner_info")

487
pyasic/API/cgminer.py Normal file
View File

@@ -0,0 +1,487 @@
from pyasic.API import BaseMinerAPI
class CGMinerAPI(BaseMinerAPI):
"""An abstraction of the CGMiner API.
Each method corresponds to an API command in GGMiner.
CGMiner API documentation:
https://github.com/ckolivas/cgminer/blob/master/API-README
This class abstracts use of the CGMiner API, as well as the
methods for sending commands to it. The self.send_command()
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
:param ip: The IP of the miner to reference the API on.
:param port: The port to reference the API on. Default is 4028.
"""
def __init__(self, ip, port=4028):
super().__init__(ip, port)
async def version(self) -> dict:
"""Get miner version info.
:return: Miner version information.
"""
return await self.send_command("version")
async def config(self) -> dict:
"""Get some basic configuration info.
:return: Some miner configuration information:
ASC Count <- the number of ASCs
PGA Count <- the number of PGAs
Pool Count <- the number of Pools
Strategy <- the current pool strategy
Log Interval <- the interval of logging
Device Code <- list of compiled device drivers
OS <- the current operating system
"""
return await self.send_command("config")
async def summary(self) -> dict:
"""Get the status summary of the miner.
:return: The status summary of the miner.
"""
return await self.send_command("summary")
async def pools(self) -> dict:
"""Get pool information.
:return: Miner pool information.
"""
return await self.send_command("pools")
async def devs(self) -> dict:
"""Get data on each PGA/ASC with their details.
:return: Data on each PGA/ASC with their details.
"""
return await self.send_command("devs")
async def edevs(self, old: bool = False) -> dict:
"""Get data on each PGA/ASC with their details, ignoring
blacklisted and zombie devices.
:param old: Include zombie devices that became zombies less
than 'old' seconds ago
:return: Data on each PGA/ASC with their details.
"""
if old:
return await self.send_command("edevs", parameters="old")
else:
return await self.send_command("edevs")
async def pga(self, n: int) -> dict:
"""Get data from PGA n.
:param n: The PGA number to get data from.
:return: Data on the PGA n.
"""
return await self.send_command("pga", parameters=n)
async def pgacount(self) -> dict:
"""Get data fon all PGAs.
:return: Data on the PGAs connected.
"""
return await self.send_command("pgacount")
async def switchpool(self, n: int) -> dict:
"""Switch pools to pool n.
:param n: The pool to switch to.
:return: A confirmation of switching to pool n.
"""
return await self.send_command("switchpool", parameters=n)
async def enablepool(self, n: int) -> dict:
"""Enable pool n.
:param n: The pool to enable.
:return: A confirmation of enabling pool n.
"""
return await self.send_command("enablepool", parameters=n)
async def addpool(self, url: str, username: str, password: str) -> dict:
"""Add a pool to the miner.
:param url: The URL of the new pool to add.
:param username: The users username on the new pool.
:param password: The worker password on the new pool.
:return: A confirmation of adding the pool.
"""
return await self.send_command(
"addpool", parameters=f"{url},{username},{password}"
)
async def poolpriority(self, *n: int) -> dict:
"""Set pool priority.
:param n: Pools in order of priority.
:return: A confirmation of setting pool priority.
"""
pools = f"{','.join([str(item) for item in n])}"
return await self.send_command("poolpriority", parameters=pools)
async def poolquota(self, n: int, q: int) -> dict:
"""Set pool quota.
:param n: Pool number to set quota on.
:param q: Quota to set the pool to.
:return: A confirmation of setting pool quota.
"""
return await self.send_command("poolquota", parameters=f"{n},{q}")
async def disablepool(self, n: int) -> dict:
"""Disable a pool.
:param n: Pool to disable.
:return: A confirmation of diabling the pool.
"""
return await self.send_command("disablepool", parameters=n)
async def removepool(self, n: int) -> dict:
"""Remove a pool.
:param n: Pool to remove.
:return: A confirmation of removing the pool.
"""
return await self.send_command("removepool", parameters=n)
async def save(self, filename: str = None) -> dict:
"""Save the config.
:param filename: Filename to save the config as.
:return: A confirmation of saving the config.
"""
if filename:
return await self.send_command("save", parameters=filename)
else:
return await self.send_command("save")
async def quit(self) -> dict:
"""Quit CGMiner.
:return: A single "BYE" before CGMiner quits.
"""
return await self.send_command("quit")
async def notify(self) -> dict:
"""Notify the user of past errors.
:return: The last status and count of each devices problem(s).
"""
return await self.send_command("notify")
async def privileged(self) -> dict:
"""Check if you have privileged access.
:return: The STATUS section with an error if you have no
privileged access, or success if you have privileged access.
"""
return await self.send_command("privileged")
async def pgaenable(self, n: int) -> dict:
"""Enable PGA n.
:param n: The PGA to enable.
:return: A confirmation of enabling PGA n.
"""
return await self.send_command("pgaenable", parameters=n)
async def pgadisable(self, n: int) -> dict:
"""Disable PGA n.
:param n: The PGA to disable.
:return: A confirmation of disabling PGA n.
"""
return await self.send_command("pgadisable", parameters=n)
async def pgaidentify(self, n: int) -> dict:
"""Identify PGA n.
:param n: The PGA to identify.
:return: A confirmation of identifying PGA n.
"""
return await self.send_command("pgaidentify", parameters=n)
async def devdetails(self) -> dict:
"""Get data on all devices with their static details.
:return: Data on all devices with their static details.
"""
return await self.send_command("devdetails")
async def restart(self) -> dict:
"""Restart CGMiner using the API.
:return: A reply informing of the restart.
"""
return await self.send_command("restart")
async def stats(self) -> dict:
"""Get stats of each device/pool with more than 1 getwork.
:return: Stats of each device/pool with more than 1 getwork.
"""
return await self.send_command("stats")
async def estats(self, old: bool = False) -> dict:
"""Get stats of each device/pool with more than 1 getwork,
ignoring zombie devices.
:param old: Include zombie devices that became zombies less
than 'old' seconds ago.
:return: Stats of each device/pool with more than 1 getwork,
ignoring zombie devices.
"""
if old:
return await self.send_command("estats", parameters=old)
else:
return await self.send_command("estats")
async def check(self, command: str) -> dict:
"""Check if the command command exists in CGMiner.
:param command: The command to check.
:return: Information about a command:
Exists (Y/N) <- the command exists in this version
Access (Y/N) <- you have access to use the command
"""
return await self.send_command("check", parameters=command)
async def failover_only(self, failover: bool) -> dict:
"""Set failover-only.
:param failover: What to set failover-only to.
:return: Confirmation of setting failover-only.
"""
return await self.send_command("failover-only", parameters=failover)
async def coin(self) -> dict:
"""Get information on the current coin.
:return: Information about the current coin being mined:
Hash Method <- the hashing algorithm
Current Block Time <- blocktime as a float, 0 means none
Current Block Hash <- the hash of the current block, blank
means none
LP <- whether LP is in use on at least 1 pool
Network Difficulty: the current network difficulty
"""
return await self.send_command("coin")
async def debug(self, setting: str) -> dict:
"""Set a debug setting.
:param setting: Which setting to switch to. Options are:
Silent,
Quiet,
Verbose,
Debug,
RPCProto,
PerDevice,
WorkTime,
Normal.
:return: Data on which debug setting was enabled or disabled.
"""
return await self.send_command("debug", parameters=setting)
async def setconfig(self, name: str, n: int) -> dict:
"""Set config of name to value n.
:param name: The name of the config setting to set. Options are:
queue,
scantime,
expiry.
:param n: The value to set the 'name' setting to.
:return: The results of setting config of name to n.
"""
return await self.send_command("setconfig", parameters=f"{name},{n}")
async def usbstats(self) -> dict:
"""Get stats of all USB devices except ztex.
:return: The stats of all USB devices except ztex.
"""
return await self.send_command("usbstats")
async def pgaset(self, n: int, opt: str, val: int = None) -> dict:
"""Set PGA option opt to val on PGA n.
Options:
MMQ -
opt: clock
val: 160 - 230 (multiple of 2)
CMR -
opt: clock
val: 100 - 220
:param n: The PGA to set the options on.
:param opt: The option to set. Setting this to 'help'
returns a help message.
:param val: The value to set the option to.
:return: Confirmation of setting PGA n with opt[,val].
"""
if val:
return await self.send_command("pgaset", parameters=f"{n},{opt},{val}")
else:
return await self.send_command("pgaset", parameters=f"{n},{opt}")
async def zero(self, which: str, summary: bool) -> dict:
"""Zero a device.
:param which: Which device to zero.
Setting this to 'all' zeros all devices.
Setting this to 'bestshare' zeros only the bestshare values
for each pool and global.
:param summary: Whether or not to show a full summary.
:return: the STATUS section with info on the zero and optional
summary.
"""
return await self.send_command("zero", parameters=f"{which},{summary}")
async def hotplug(self, n: int) -> dict:
"""Enable hotplug.
:param n: The device number to set hotplug on.
:return: Information on hotplug status.
"""
return await self.send_command("hotplug", parameters=n)
async def asc(self, n: int) -> dict:
"""Get data for ASC device n.
:param n: The device to get data for.
:return: The data for ASC device n.
"""
return await self.send_command("asc", parameters=n)
async def ascenable(self, n: int) -> dict:
"""Enable ASC device n.
:param n: The device to enable.
:return: Confirmation of enabling ASC device n.
"""
return await self.send_command("ascenable", parameters=n)
async def ascdisable(self, n: int) -> dict:
"""Disable ASC device n.
:param n: The device to disable.
:return: Confirmation of disabling ASC device n.
"""
return await self.send_command("ascdisable", parameters=n)
async def ascidentify(self, n: int) -> dict:
"""Identify ASC device n.
:param n: The device to identify.
:return: Confirmation of identifying ASC device n.
"""
return await self.send_command("ascidentify", parameters=n)
async def asccount(self) -> dict:
"""Get data on the number of ASC devices and their info.
:return: Data on all ASC devices.
"""
return await self.send_command("asccount")
async def ascset(self, n: int, opt: str, val: int = None) -> dict:
"""Set ASC n option opt to value val.
Sets an option on the ASC n to a value. Allowed options are:
AVA+BTB -
opt: freq
val: 256 - 1024 (chip frequency)
BTB -
opt: millivolts
val: 1000 - 1400 (core voltage)
MBA -
opt: reset
val: 0 - # of chips (reset a chip)
opt: freq
val: 0 - # of chips, 100 - 1400 (chip frequency)
opt: ledcount
val: 0 - 100 (chip count for LED)
opt: ledlimit
val: 0 - 200 (LED off below GH/s)
opt: spidelay
val: 0 - 9999 (SPI per I/O delay)
opt: spireset
val: i or s, 0 - 9999 (SPI regular reset)
opt: spisleep
val: 0 - 9999 (SPI reset sleep in ms)
BMA -
opt: volt
val: 0 - 9
opt: clock
val: 0 - 15
:param n: The ASC to set the options on.
:param opt: The option to set. Setting this to 'help' returns a
help message.
:param val: The value to set the option to.
:return: Confirmation of setting option opt to value val.
"""
if val:
return await self.send_command("ascset", parameters=f"{n},{opt},{val}")
else:
return await self.send_command("ascset", parameters=f"{n},{opt}")
async def lcd(self) -> dict:
"""Get a general all-in-one status summary of the miner.
:return: An all-in-one status summary of the miner.
"""
return await self.send_command("lcd")
async def lockstats(self) -> dict:
"""Write lockstats to STDERR.
:return: The result of writing the lock stats to STDERR.
"""
return await self.send_command("lockstats")

80
pyasic/API/unknown.py Normal file
View File

@@ -0,0 +1,80 @@
from pyasic.API import BaseMinerAPI
class UnknownAPI(BaseMinerAPI):
"""An abstraction of an API for a miner which is unknown.
This class is designed to try to be a intersection of as many miner APIs
and API commands as possible (API ⋂ API), to ensure that it can be used
with as many APIs as possible.
"""
def __init__(self, ip, port=4028):
super().__init__(ip, port)
async def asccount(self) -> dict:
return await self.send_command("asccount")
async def asc(self, n: int) -> dict:
return await self.send_command("asc", parameters=n)
async def devdetails(self) -> dict:
return await self.send_command("devdetails")
async def devs(self) -> dict:
return await self.send_command("devs")
async def edevs(self, old: bool = False) -> dict:
if old:
return await self.send_command("edevs", parameters="old")
else:
return await self.send_command("edevs")
async def pools(self) -> dict:
return await self.send_command("pools")
async def summary(self) -> dict:
return await self.send_command("summary")
async def stats(self) -> dict:
return await self.send_command("stats")
async def version(self) -> dict:
return await self.send_command("version")
async def estats(self) -> dict:
return await self.send_command("estats")
async def check(self) -> dict:
return await self.send_command("check")
async def coin(self) -> dict:
return await self.send_command("coin")
async def lcd(self) -> dict:
return await self.send_command("lcd")
async def switchpool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("switchpool", parameters=n)
async def enablepool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("enablepool", parameters=n)
async def disablepool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("disablepool", parameters=n)
async def addpool(self, url: str, username: str, password: str) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("addpool", parameters=f"{url},{username},{password}")
async def removepool(self, n: int) -> dict:
# BOS has not implemented this yet, they will in the future
raise NotImplementedError
# return await self.send_command("removepool", parameters=n)