refactor: move base classes to base.py in their directories, move data locations to miners.data, and rename types to models.

This commit is contained in:
UpstreamData
2024-01-25 14:26:53 -07:00
parent aa1d7c1b6f
commit dd4c087749
261 changed files with 779 additions and 741 deletions

View File

@@ -13,322 +13,11 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
import ipaddress
import json
import logging
import re
import warnings
from typing import Union
from pyasic.errors import APIError, APIWarning
class BaseMinerRPCAPI:
def __init__(self, ip: str, port: int = 4028, api_ver: str = "0.0.0") -> None:
# api port, should be 4028
self.port = port
# ip address of the miner
self.ip = ipaddress.ip_address(ip)
# api version if known
self.api_ver = api_ver
self.pwd = None
def __new__(cls, *args, **kwargs):
if cls is BaseMinerRPCAPI:
raise TypeError(f"Only children of '{cls.__name__}' may be instantiated")
return object.__new__(cls)
def __repr__(self):
return f"{self.__class__.__name__}: {str(self.ip)}"
async def send_command(
self,
command: Union[str, bytes],
parameters: Union[str, int, bool] = None,
ignore_errors: bool = False,
allow_warning: bool = True,
**kwargs,
) -> dict:
"""Send an API command to the miner and return the result.
Parameters:
command: The command to sent to the miner.
parameters: Any additional parameters to be sent with the command.
ignore_errors: Whether to raise APIError when the command returns an error.
allow_warning: Whether to warn if the command fails.
Returns:
The return data from the API command parsed from JSON into a dict.
"""
logging.debug(
f"{self} - (Send Privileged Command) - {command} "
+ f"with args {parameters}"
if parameters
else ""
)
# create the command
cmd = {"command": command, **kwargs}
if parameters:
cmd["parameter"] = parameters
# send the command
data = await self._send_bytes(json.dumps(cmd).encode("utf-8"))
if data == b"Socket connect failed: Connection refused\n":
if not ignore_errors:
raise APIError(data.decode("utf-8"))
return {}
data = self._load_api_data(data)
# check for if the user wants to allow errors to return
validation = self._validate_command_output(data)
if not validation[0]:
if not ignore_errors:
# validate the command succeeded
raise APIError(validation[1])
if allow_warning:
logging.warning(
f"{self.ip}: API Command Error: {command}: {validation[1]}"
)
logging.debug(f"{self} - (Send Command) - Received data.")
return data
# Privileged command handler, only used by whatsminers, defined here for consistency.
async def send_privileged_command(self, *args, **kwargs) -> dict:
return await self.send_command(*args, **kwargs)
async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
"""Creates and sends multiple commands as one command to the miner.
Parameters:
*commands: The commands to send as a multicommand to the miner.
allow_warning: A boolean to supress APIWarnings.
"""
# make sure we can actually run each command, otherwise they will fail
commands = self._check_commands(*commands)
# standard multicommand format is "command1+command2"
# doesn't work for S19 which uses the backup _send_split_multicommand
command = "+".join(commands)
try:
data = await self.send_command(command, allow_warning=allow_warning)
except APIError:
data = await self._send_split_multicommand(*commands)
data["multicommand"] = True
return data
async def _send_split_multicommand(
self, *commands, allow_warning: bool = True
) -> dict:
tasks = {}
# send all commands individually
for cmd in commands:
tasks[cmd] = asyncio.create_task(
self.send_command(cmd, allow_warning=allow_warning)
)
await asyncio.gather(*[tasks[cmd] for cmd in tasks], return_exceptions=True)
data = {}
for cmd in tasks:
try:
result = tasks[cmd].result()
if result is None or result == {}:
result = {}
data[cmd] = [result]
except APIError:
pass
return data
@property
def commands(self) -> list:
return self.get_commands()
def get_commands(self) -> list:
"""Get a list of command accessible to a specific type of API on the miner.
Returns:
A list of all API commands that the miner supports.
"""
return [
func
for func in
# each function in self
dir(self)
if not func == "commands"
if callable(getattr(self, func)) and
# no __ or _ methods
not func.startswith("__") and not func.startswith("_") and
# remove all functions that are in this base class
func
not in [
func
for func in dir(BaseMinerRPCAPI)
if callable(getattr(BaseMinerRPCAPI, func))
]
]
def _check_commands(self, *commands):
allowed_commands = self.commands
return_commands = []
for command in commands:
if command in allowed_commands:
return_commands.append(command)
else:
warnings.warn(
f"""Removing incorrect command: {command}
If you are sure you want to use this command please use API.send_command("{command}", ignore_errors=True) instead.""",
APIWarning,
)
return return_commands
async def _send_bytes(
self,
data: bytes,
timeout: int = 100,
) -> bytes:
logging.debug(f"{self} - ([Hidden] Send Bytes) - Sending")
try:
# get reader and writer streams
reader, writer = await asyncio.open_connection(str(self.ip), self.port)
# handle OSError 121
except OSError as e:
if e.errno == 121:
logging.warning(
f"{self} - ([Hidden] Send Bytes) - Semaphore timeout expired."
)
return b"{}"
# send the command
logging.debug(f"{self} - ([Hidden] Send Bytes) - Writing")
writer.write(data)
logging.debug(f"{self} - ([Hidden] Send Bytes) - Draining")
await writer.drain()
try:
# TO address a situation where a whatsminer has an unknown PW -AND-
# Fix for stupid whatsminer bug, reboot/restart seem to not load properly in the loop
# have to receive, save the data, check if there is more data by reading with a short timeout
# append that data if there is more, and then onto the main loop.
# the password timeout might need to be longer than 1, but it seems to work for now.
ret_data = await asyncio.wait_for(reader.read(1), timeout=1)
except asyncio.TimeoutError:
return b"{}"
try:
ret_data += await asyncio.wait_for(reader.read(4096), timeout=timeout)
except ConnectionAbortedError:
return b"{}"
# loop to receive all the data
logging.debug(f"{self} - ([Hidden] Send Bytes) - Receiving")
try:
while True:
try:
d = await asyncio.wait_for(reader.read(4096), timeout=timeout)
if not d:
break
ret_data += d
except (asyncio.CancelledError, asyncio.TimeoutError) as e:
raise e
except (asyncio.CancelledError, asyncio.TimeoutError) as e:
raise e
except Exception as e:
logging.warning(f"{self} - ([Hidden] Send Bytes) - API Command Error {e}")
# close the connection
logging.debug(f"{self} - ([Hidden] Send Bytes) - Closing")
writer.close()
await writer.wait_closed()
return ret_data
@staticmethod
def _validate_command_output(data: dict) -> tuple:
# check if the data returned is correct or an error
# if status isn't a key, it is a multicommand
if "STATUS" not in data.keys():
for key in data.keys():
# make sure not to try to turn id into a dict
if not key == "id":
# make sure they succeeded
if "STATUS" in data[key][0].keys():
if data[key][0]["STATUS"][0]["STATUS"] not in ["S", "I"]:
# this is an error
return False, f"{key}: " + data[key][0]["STATUS"][0]["Msg"]
elif "id" not in data.keys():
if isinstance(data["STATUS"], list):
if data["STATUS"][0].get("STATUS", None) in ["S", "I"]:
return True, None
else:
return False, data["STATUS"][0]["Msg"]
elif isinstance(data["STATUS"], dict):
# new style X19 command
if data["STATUS"]["STATUS"] not in ["S", "I"]:
return False, data["STATUS"]["Msg"]
return True, None
if data["STATUS"] not in ["S", "I"]:
return False, data["Msg"]
else:
# make sure the command succeeded
if isinstance(data["STATUS"], str):
if data["STATUS"] in ["RESTART"]:
return True, None
elif isinstance(data["STATUS"], dict):
if data["STATUS"].get("STATUS") in ["S", "I"]:
return True, None
elif data["STATUS"][0]["STATUS"] not in ("S", "I"):
# this is an error
if data["STATUS"][0]["STATUS"] not in ("S", "I"):
return False, data["STATUS"][0]["Msg"]
return True, None
@staticmethod
def _load_api_data(data: bytes) -> dict:
# some json from the API returns with a null byte (\x00) on the end
if data.endswith(b"\x00"):
# handle the null byte
str_data = data.decode("utf-8")[:-1]
else:
# no null byte
str_data = data.decode("utf-8")
# fix an error with a btminer return having an extra comma that breaks json.loads()
str_data = str_data.replace(",}", "}")
# fix an error with a btminer return having a newline that breaks json.loads()
str_data = str_data.replace("\n", "")
# fix an error with a bmminer return not having a specific comma that breaks json.loads()
str_data = str_data.replace("}{", "},{")
# fix an error with a bmminer return having a specific comma that breaks json.loads()
str_data = str_data.replace("[,{", "[{")
# fix an error with a btminer return having a missing comma. (2023-01-06 version)
str_data = str_data.replace('""temp0', '","temp0')
# fix an error with Avalonminers returning inf and nan
str_data = str_data.replace("info", "1nfo")
str_data = str_data.replace("inf", "0")
str_data = str_data.replace("1nfo", "info")
str_data = str_data.replace("nan", "0")
# fix whatever this garbage from avalonminers is `,"id":1}`
if str_data.startswith(","):
str_data = f"{{{str_data[1:]}"
# try to fix an error with overflowing the receive buffer
# this can happen in cases such as bugged btminers returning arbitrary length error info with 100s of errors.
if not str_data.endswith("}"):
str_data = ",".join(str_data.split(",")[:-1]) + "}"
# fix a really nasty bug with whatsminer API v2.0.4 where they return a list structured like a dict
if re.search(r"\"error_code\":\[\".+\"\]", str_data):
str_data = str_data.replace("[", "{").replace("]", "}")
# parse the json
try:
parsed_data = json.loads(str_data)
except json.decoder.JSONDecodeError as e:
raise APIError(f"Decode Error {e}: {str_data}")
return parsed_data
from .bfgminer import BFGMinerRPCAPI
from .bmminer import BMMinerRPCAPI
from .bosminer import BOSMinerRPCAPI
from .btminer import BTMinerRPCAPI
from .cgminer import CGMinerRPCAPI
from .gcminer import GCMinerRPCAPI
from .luxminer import LUXMinerRPCAPI
from .unknown import UnknownRPCAPI

334
pyasic/rpc/base.py Normal file
View File

@@ -0,0 +1,334 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
import ipaddress
import json
import logging
import re
import warnings
from typing import Union
from pyasic.errors import APIError, APIWarning
class BaseMinerRPCAPI:
def __init__(self, ip: str, port: int = 4028, api_ver: str = "0.0.0") -> None:
# api port, should be 4028
self.port = port
# ip address of the miner
self.ip = ipaddress.ip_address(ip)
# api version if known
self.api_ver = api_ver
self.pwd = None
def __new__(cls, *args, **kwargs):
if cls is BaseMinerRPCAPI:
raise TypeError(f"Only children of '{cls.__name__}' may be instantiated")
return object.__new__(cls)
def __repr__(self) -> str:
return f"{self.__class__.__name__}: {str(self.ip)}"
async def send_command(
self,
command: Union[str, bytes],
parameters: Union[str, int, bool] = None,
ignore_errors: bool = False,
allow_warning: bool = True,
**kwargs,
) -> dict:
"""Send an API command to the miner and return the result.
Parameters:
command: The command to sent to the miner.
parameters: Any additional parameters to be sent with the command.
ignore_errors: Whether to raise APIError when the command returns an error.
allow_warning: Whether to warn if the command fails.
Returns:
The return data from the API command parsed from JSON into a dict.
"""
logging.debug(
f"{self} - (Send Privileged Command) - {command} "
+ f"with args {parameters}"
if parameters
else ""
)
# create the command
cmd = {"command": command, **kwargs}
if parameters:
cmd["parameter"] = parameters
# send the command
data = await self._send_bytes(json.dumps(cmd).encode("utf-8"))
if data == b"Socket connect failed: Connection refused\n":
if not ignore_errors:
raise APIError(data.decode("utf-8"))
return {}
data = self._load_api_data(data)
# check for if the user wants to allow errors to return
validation = self._validate_command_output(data)
if not validation[0]:
if not ignore_errors:
# validate the command succeeded
raise APIError(validation[1])
if allow_warning:
logging.warning(
f"{self.ip}: API Command Error: {command}: {validation[1]}"
)
logging.debug(f"{self} - (Send Command) - Received data.")
return data
# Privileged command handler, only used by whatsminers, defined here for consistency.
async def send_privileged_command(self, *args, **kwargs) -> dict:
return await self.send_command(*args, **kwargs)
async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
"""Creates and sends multiple commands as one command to the miner.
Parameters:
*commands: The commands to send as a multicommand to the miner.
allow_warning: A boolean to supress APIWarnings.
"""
# make sure we can actually run each command, otherwise they will fail
commands = self._check_commands(*commands)
# standard multicommand format is "command1+command2"
# doesn't work for S19 which uses the backup _send_split_multicommand
command = "+".join(commands)
try:
data = await self.send_command(command, allow_warning=allow_warning)
except APIError:
data = await self._send_split_multicommand(*commands)
data["multicommand"] = True
return data
async def _send_split_multicommand(
self, *commands, allow_warning: bool = True
) -> dict:
tasks = {}
# send all commands individually
for cmd in commands:
tasks[cmd] = asyncio.create_task(
self.send_command(cmd, allow_warning=allow_warning)
)
await asyncio.gather(*[tasks[cmd] for cmd in tasks], return_exceptions=True)
data = {}
for cmd in tasks:
try:
result = tasks[cmd].result()
if result is None or result == {}:
result = {}
data[cmd] = [result]
except APIError:
pass
return data
@property
def commands(self) -> list:
return self.get_commands()
def get_commands(self) -> list:
"""Get a list of command accessible to a specific type of API on the miner.
Returns:
A list of all API commands that the miner supports.
"""
return [
func
for func in
# each function in self
dir(self)
if not func == "commands"
if callable(getattr(self, func)) and
# no __ or _ methods
not func.startswith("__") and not func.startswith("_") and
# remove all functions that are in this base class
func
not in [
func
for func in dir(BaseMinerRPCAPI)
if callable(getattr(BaseMinerRPCAPI, func))
]
]
def _check_commands(self, *commands) -> list:
allowed_commands = self.commands
return_commands = []
for command in commands:
if command in allowed_commands:
return_commands.append(command)
else:
warnings.warn(
f"""Removing incorrect command: {command}
If you are sure you want to use this command please use API.send_command("{command}", ignore_errors=True) instead.""",
APIWarning,
)
return return_commands
async def _send_bytes(
self,
data: bytes,
timeout: int = 100,
) -> bytes:
logging.debug(f"{self} - ([Hidden] Send Bytes) - Sending")
try:
# get reader and writer streams
reader, writer = await asyncio.open_connection(str(self.ip), self.port)
# handle OSError 121
except OSError as e:
if e.errno == 121:
logging.warning(
f"{self} - ([Hidden] Send Bytes) - Semaphore timeout expired."
)
return b"{}"
# send the command
logging.debug(f"{self} - ([Hidden] Send Bytes) - Writing")
writer.write(data)
logging.debug(f"{self} - ([Hidden] Send Bytes) - Draining")
await writer.drain()
try:
# TO address a situation where a whatsminer has an unknown PW -AND-
# Fix for stupid whatsminer bug, reboot/restart seem to not load properly in the loop
# have to receive, save the data, check if there is more data by reading with a short timeout
# append that data if there is more, and then onto the main loop.
# the password timeout might need to be longer than 1, but it seems to work for now.
ret_data = await asyncio.wait_for(reader.read(1), timeout=1)
except asyncio.TimeoutError:
return b"{}"
try:
ret_data += await asyncio.wait_for(reader.read(4096), timeout=timeout)
except ConnectionAbortedError:
return b"{}"
# loop to receive all the data
logging.debug(f"{self} - ([Hidden] Send Bytes) - Receiving")
try:
while True:
try:
d = await asyncio.wait_for(reader.read(4096), timeout=timeout)
if not d:
break
ret_data += d
except (asyncio.CancelledError, asyncio.TimeoutError) as e:
raise e
except (asyncio.CancelledError, asyncio.TimeoutError) as e:
raise e
except Exception as e:
logging.warning(f"{self} - ([Hidden] Send Bytes) - API Command Error {e}")
# close the connection
logging.debug(f"{self} - ([Hidden] Send Bytes) - Closing")
writer.close()
await writer.wait_closed()
return ret_data
@staticmethod
def _validate_command_output(data: dict) -> tuple:
# check if the data returned is correct or an error
# if status isn't a key, it is a multicommand
if "STATUS" not in data.keys():
for key in data.keys():
# make sure not to try to turn id into a dict
if not key == "id":
# make sure they succeeded
if "STATUS" in data[key][0].keys():
if data[key][0]["STATUS"][0]["STATUS"] not in ["S", "I"]:
# this is an error
return False, f"{key}: " + data[key][0]["STATUS"][0]["Msg"]
elif "id" not in data.keys():
if isinstance(data["STATUS"], list):
if data["STATUS"][0].get("STATUS", None) in ["S", "I"]:
return True, None
else:
return False, data["STATUS"][0]["Msg"]
elif isinstance(data["STATUS"], dict):
# new style X19 command
if data["STATUS"]["STATUS"] not in ["S", "I"]:
return False, data["STATUS"]["Msg"]
return True, None
if data["STATUS"] not in ["S", "I"]:
return False, data["Msg"]
else:
# make sure the command succeeded
if isinstance(data["STATUS"], str):
if data["STATUS"] in ["RESTART"]:
return True, None
elif isinstance(data["STATUS"], dict):
if data["STATUS"].get("STATUS") in ["S", "I"]:
return True, None
elif data["STATUS"][0]["STATUS"] not in ("S", "I"):
# this is an error
if data["STATUS"][0]["STATUS"] not in ("S", "I"):
return False, data["STATUS"][0]["Msg"]
return True, None
@staticmethod
def _load_api_data(data: bytes) -> dict:
# some json from the API returns with a null byte (\x00) on the end
if data.endswith(b"\x00"):
# handle the null byte
str_data = data.decode("utf-8")[:-1]
else:
# no null byte
str_data = data.decode("utf-8")
# fix an error with a btminer return having an extra comma that breaks json.loads()
str_data = str_data.replace(",}", "}")
# fix an error with a btminer return having a newline that breaks json.loads()
str_data = str_data.replace("\n", "")
# fix an error with a bmminer return not having a specific comma that breaks json.loads()
str_data = str_data.replace("}{", "},{")
# fix an error with a bmminer return having a specific comma that breaks json.loads()
str_data = str_data.replace("[,{", "[{")
# fix an error with a btminer return having a missing comma. (2023-01-06 version)
str_data = str_data.replace('""temp0', '","temp0')
# fix an error with Avalonminers returning inf and nan
str_data = str_data.replace("info", "1nfo")
str_data = str_data.replace("inf", "0")
str_data = str_data.replace("1nfo", "info")
str_data = str_data.replace("nan", "0")
# fix whatever this garbage from avalonminers is `,"id":1}`
if str_data.startswith(","):
str_data = f"{{{str_data[1:]}"
# try to fix an error with overflowing the receive buffer
# this can happen in cases such as bugged btminers returning arbitrary length error info with 100s of errors.
if not str_data.endswith("}"):
str_data = ",".join(str_data.split(",")[:-1]) + "}"
# fix a really nasty bug with whatsminer API v2.0.4 where they return a list structured like a dict
if re.search(r"\"error_code\":\[\".+\"\]", str_data):
str_data = str_data.replace("[", "{").replace("]", "}")
# parse the json
try:
parsed_data = json.loads(str_data)
except json.decoder.JSONDecodeError as e:
raise APIError(f"Decode Error {e}: {str_data}")
return parsed_data

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.rpc import BaseMinerRPCAPI
from pyasic.rpc.base import BaseMinerRPCAPI
class BOSMinerRPCAPI(BaseMinerRPCAPI):

View File

@@ -30,7 +30,7 @@ from passlib.handlers.md5_crypt import md5_crypt
from pyasic import settings
from pyasic.errors import APIError
from pyasic.misc import api_min_version
from pyasic.rpc import BaseMinerRPCAPI
from pyasic.rpc.base import BaseMinerRPCAPI
### IMPORTANT ###
# you need to change the password of the miners using the Whatsminer
@@ -89,7 +89,7 @@ def _add_to_16(string: str) -> bytes:
return str.encode(string) # return bytes
def parse_btminer_priviledge_data(token_data: dict, data: dict):
def parse_btminer_priviledge_data(token_data: dict, data: dict) -> dict:
"""Parses data returned from the BTMiner privileged API.
Parses data from the BTMiner privileged API using the token
@@ -185,10 +185,10 @@ class BTMinerRPCAPI(BaseMinerRPCAPI):
ip: The IP of the miner to reference the API on.
"""
def __init__(self, ip: str, port: int = 4028, api_ver: str = "0.0.0"):
def __init__(self, ip: str, port: int = 4028, api_ver: str = "0.0.0") -> None:
super().__init__(ip, port, api_ver)
self.pwd = settings.get("default_whatsminer_rpc_password", "admin")
self.current_token = None
self.token = None
async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
"""Creates and sends multiple commands as one command to the miner.
@@ -269,7 +269,7 @@ class BTMinerRPCAPI(BaseMinerRPCAPI):
data = self._load_api_data(data)
try:
data = parse_btminer_priviledge_data(self.current_token, data)
data = parse_btminer_priviledge_data(self.token, data)
except Exception as e:
logging.info(f"{str(self.ip)}: {e}")
@@ -292,11 +292,11 @@ class BTMinerRPCAPI(BaseMinerRPCAPI):
</details>
"""
logging.debug(f"{self} - (Get Token) - Getting token")
if self.current_token:
if self.current_token[
"timestamp"
] > datetime.datetime.now() - datetime.timedelta(minutes=30):
return self.current_token
if self.token:
if self.token["timestamp"] > datetime.datetime.now() - datetime.timedelta(
minutes=30
):
return self.token
# get the token
data = await self.send_command("get_token")
@@ -316,15 +316,13 @@ class BTMinerRPCAPI(BaseMinerRPCAPI):
host_sign = tmp[3]
# set the current token
self.current_token = {
self.token = {
"host_sign": host_sign,
"host_passwd_md5": host_passwd_md5,
"timestamp": datetime.datetime.now(),
}
logging.debug(
f"{self} - (Get Token) - Gathered token data: {self.current_token}"
)
return self.current_token
logging.debug(f"{self} - (Get Token) - Gathered token data: {self.token}")
return self.token
#### PRIVILEGED COMMANDS ####
# Please read the top of this file to learn

View File

@@ -14,7 +14,7 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.rpc import BaseMinerRPCAPI
from pyasic.rpc.base import BaseMinerRPCAPI
class CGMinerRPCAPI(BaseMinerRPCAPI):

View File

@@ -13,9 +13,8 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import Literal
from pyasic.rpc import BaseMinerRPCAPI
from pyasic.rpc.base import BaseMinerRPCAPI
class GCMinerRPCAPI(BaseMinerRPCAPI):

View File

@@ -15,7 +15,7 @@
# ------------------------------------------------------------------------------
from typing import Literal
from pyasic.rpc import BaseMinerRPCAPI
from pyasic.rpc.base import BaseMinerRPCAPI
class LUXMinerRPCAPI(BaseMinerRPCAPI):