feature: Start refactor to new style of get_miner. Needs testing and stability fixes.

This commit is contained in:
UpstreamData
2023-06-05 09:03:49 -06:00
parent a2bb8b5d9b
commit d6a42238d0
3 changed files with 411 additions and 697 deletions

View File

@@ -14,16 +14,17 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
import ipaddress
import json
import logging
from typing import AsyncIterable, List, Tuple, Union
import re
from ipaddress import IPv4Address
from typing import Optional, Tuple, Union
import asyncssh
import httpx
from pyasic.errors import APIError
from pyasic.logger import logger
from pyasic.miners.backends import BOSMiner # noqa - Ignore _module import
from pyasic.miners.backends import CGMiner # noqa - Ignore _module import
from pyasic.miners.backends.bmminer import BMMiner # noqa - Ignore _module import
@@ -42,8 +43,10 @@ from pyasic.miners.kda import *
from pyasic.miners.ltc import *
from pyasic.miners.unknown import UnknownMiner
from pyasic.miners.zec import *
from pyasic.misc import Singleton
from pyasic.settings import PyasicSettings
TIMEOUT = 30
LOOPS = 1
MINER_CLASSES = {
"ANTMINER DR5": {
@@ -588,85 +591,259 @@ MINER_CLASSES = {
}
class MinerFactory(metaclass=Singleton):
"""A factory to handle identification and selection of the proper class of miner."""
def __init__(self) -> None:
self.miners = {}
async def get_miner_generator(
self, ips: List[Union[ipaddress.ip_address, str]]
) -> AsyncIterable[AnyMiner]:
"""
Get Miner objects from ip addresses using an async generator.
Returns an asynchronous generator containing Miners.
Parameters:
ips: a list of ip addresses to get miners for.
Returns:
An async iterable containing miners.
"""
# get the event loop
loop = asyncio.get_event_loop()
# create a list of tasks
scan_tasks = []
# for each miner IP that was passed in, add a task to get its class
for miner in ips:
scan_tasks.append(loop.create_task(self.get_miner(miner)))
# asynchronously run the tasks and return them as they complete
scanned = asyncio.as_completed(scan_tasks)
# loop through and yield the miners as they complete
for miner in scanned:
yield await miner
async def get_miner(self, ip: Union[ipaddress.ip_address, str]) -> AnyMiner:
"""Decide a miner type using the IP address of the miner.
Parameters:
ip: An `ipaddress.ip_address` or string of the IP to find the miner.
Returns:
A miner class.
"""
if isinstance(ip, str):
ip = ipaddress.ip_address(ip)
# check if the miner already exists in cache
if ip in self.miners:
return self.miners[ip]
# if everything fails, the miner is already set to unknown
model, api, ver, api_ver = None, None, None, None
# try to get the API multiple times based on retries
for i in range(PyasicSettings().miner_factory_get_version_retries):
# TODO: Implement caching and cache clearing.
# TODO: Add Canaan support back
# TODO: Improve consistency
class MinerFactory:
async def web_ping(self, ip: str):
tasks = [self._http_ping(ip), self._http_ping(ip, https=True)]
d = asyncio.as_completed(
tasks,
timeout=TIMEOUT,
)
for i in d:
try:
# get the API type, should be BOSMiner, CGMiner, BMMiner, BTMiner, or None
new_model, new_api, new_ver, new_api_ver = await asyncio.wait_for(
self._get_miner_type(ip), timeout=10
)
# keep track of the API and model we found first
if new_api and not api:
api = new_api
if new_model and not model:
model = new_model
if new_ver and not ver:
ver = new_ver
if new_api_ver and not api_ver:
api_ver = new_api_ver
# if we find the API and model, don't need to loop anymore
if api and model:
break
data = await i
if data[0] is not None:
if not "400 - Bad Request" in data[0]:
return data
except asyncio.TimeoutError:
logging.warning(f"{ip}: Get Miner Timed Out")
miner = self._select_miner_from_classes(ip, model, api, ver, api_ver)
pass
return None, False
# save the miner to the cache at its IP if its not unknown
if not isinstance(miner, UnknownMiner):
self.miners[ip] = miner
async def _http_ping(
self, ip: str, https: bool = False
) -> Tuple[Optional[str], bool]:
request = "GET / HTTP/1.1\r\nHost: pyasic\r\n\r\n"
if https:
request = "GET / HTTPS/1.1\r\nHost: pyasic\r\n\r\n"
try:
reader, writer = await asyncio.open_connection(str(ip), 80)
response = None
try:
writer.write(request.encode())
response = await reader.read()
except asyncio.CancelledError:
writer.close()
await writer.wait_closed()
if response is not None:
data = response.decode()
if data is not None and not data == "":
return data, True
else:
writer.close()
await writer.wait_closed()
data = response.decode()
if data is not None and not data == "":
return data, True
except OSError:
pass
return None, False
# return the miner
return miner
async def sock_ping(self, ip: str) -> [Optional[dict], bool]:
try:
data = await self.send_api_command(ip, "devdetails")
if data:
return data, True
except (asyncio.exceptions.TimeoutError, OSError, ConnectionError):
pass
return None, False
async def get_miner(self, ip: str):
try:
return await asyncio.wait_for(self._get_miner(ip), TIMEOUT)
except asyncio.TimeoutError:
return None
async def _get_miner(self, ip: str):
sock_data = None
web_data = None
for i in range(LOOPS):
web_result, sock_result = await asyncio.gather(
self.web_ping(ip), self.sock_ping(ip)
)
online = sock_result[1] or web_result[1]
if online:
web_data = web_result[0]
sock_data = sock_result[0]
break
if web_data:
if "401 Unauthorized" and 'realm="antMiner' in web_data:
# antminer branch
return await self.get_miner_antminer(ip)
if "307 Temporary Redirect" and 'location="https://' in web_data:
return await self.get_miner_whatsminer(ip)
if "Braiins OS" in web_data:
return "BOS+"
if "cloud-box" in web_data:
# goldshell branch
return await self.get_miner_goldshell(ip)
if sock_data:
if "bitmicro" in str(sock_data):
return await self.get_miner_whatsminer(ip, sock_data)
if "intchains_qomo" in str(sock_data):
return await self.get_miner_goldshell(ip)
return UnknownMiner(ip)
async def send_web_command(
self,
ip: Union[ipaddress.ip_address, str],
location: str,
auth: Optional[httpx.DigestAuth] = None,
) -> Optional[dict]:
async with httpx.AsyncClient(verify=False, timeout=TIMEOUT) as client:
try:
data = await client.get(
f"http://{str(ip)}{location}",
auth=auth,
timeout=TIMEOUT,
)
except httpx.HTTPError:
logger.info(f"{ip}: Web command timeout.")
return
if data is None:
return
try:
json_data = data.json()
except json.JSONDecodeError:
return
else:
return json_data
async def send_api_command(
self, ip: Union[ipaddress.ip_address, str], command: str
) -> Optional[dict]:
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(str(ip), 4028), timeout=TIMEOUT
)
except (ConnectionError, OSError):
return
cmd = {"command": command}
# send the command
writer.write(json.dumps(cmd).encode("utf-8"))
await writer.drain()
# instantiate data
data = b""
# loop to receive all the data
while True:
d = await reader.read(4096)
if not d:
break
data += d
writer.close()
await writer.wait_closed()
data = await self.fix_api_data(data)
data = json.loads(data)
return data
async def fix_api_data(self, data: bytes):
if data.endswith(b"\x00"):
str_data = data.decode("utf-8")[:-1]
else:
str_data = data.decode("utf-8")
# fix an error with a btminer return having an extra comma that breaks json.loads()
str_data = str_data.replace(",}", "}")
# fix an error with a btminer return having a newline that breaks json.loads()
str_data = str_data.replace("\n", "")
# fix an error with a bmminer return not having a specific comma that breaks json.loads()
str_data = str_data.replace("}{", "},{")
# fix an error with a bmminer return having a specific comma that breaks json.loads()
str_data = str_data.replace("[,{", "[{")
# fix an error with a btminer return having a missing comma. (2023-01-06 version)
str_data = str_data.replace('""temp0', '","temp0')
# fix an error with Avalonminers returning inf and nan
str_data = str_data.replace("info", "1nfo")
str_data = str_data.replace("inf", "0")
str_data = str_data.replace("1nfo", "info")
str_data = str_data.replace("nan", "0")
# fix whatever this garbage from avalonminers is `,"id":1}`
if str_data.startswith(","):
str_data = f"{{{str_data[1:]}"
# try to fix an error with overflowing the recieve buffer
# this can happen in cases such as bugged btminers returning arbitrary length error info with 100s of errors.
if not str_data.endswith("}"):
str_data = ",".join(str_data.split(",")[:-1]) + "}"
# fix a really nasty bug with whatsminer API v2.0.4 where they return a list structured like a dict
if re.search(r"\"error_code\":\[\".+\"\]", str_data):
str_data = str_data.replace("[", "{").replace("]", "}")
return str_data
async def get_miner_antminer(self, ip: str):
sock_json_data = await self.send_api_command(ip, "version")
try:
miner_type = sock_json_data["VERSION"][0]["Type"]
api_type = None
api_ver = sock_json_data["VERSION"][0]["API"]
keys_str = "_".join(sock_json_data["VERSION"][0].keys())
if "cgminer" in keys_str:
api_type = "CGMiner"
elif "bmminer" in keys_str:
api_type = "BMMiner"
return self._select_miner_from_classes(
ip=IPv4Address(ip),
model=miner_type.upper(),
api=api_type,
ver=None,
api_ver=api_ver,
)
except (TypeError, LookupError):
pass
# last resort, this is slow
auth = httpx.DigestAuth("root", "root")
web_json_data = await self.send_web_command(
ip, "/cgi-bin/get_system_info.cgi", auth=auth
)
if not web_json_data:
return UnknownMiner(ip)
if web_json_data.get("minertype") is not None:
miner_type = web_json_data["minertype"].upper()
api_type = None
if "cgminer" in "_".join(web_json_data.keys()):
api_type = "CGMiner"
elif "bmminer" in "_".join(web_json_data.keys()):
api_type = "BMMiner"
return self._select_miner_from_classes(
IPv4Address(ip), miner_type, api_type, None
)
async def get_miner_goldshell(self, ip: str):
json_data = await self.send_web_command(ip, "/mcb/status")
if json_data.get("model") is not None:
miner_type = json_data["model"].replace("-", " ").upper()
return self._select_miner_from_classes(
IPv4Address(ip), miner_type, None, None
)
async def get_miner_whatsminer(self, ip: str, json_data: Optional[dict] = None):
if not json_data:
try:
json_data = await self.send_api_command(ip, "devdetails")
except (asyncio.exceptions.TimeoutError, OSError, ConnectionError):
return None
try:
miner_type, submodel = json_data["DEVDETAILS"][0]["Model"].split("V")
return self._select_miner_from_classes(
IPv4Address(ip), miner_type, submodel, None
)
except LookupError:
return None
@staticmethod
def _select_miner_from_classes(
@@ -714,467 +891,5 @@ class MinerFactory(metaclass=Singleton):
return miner
def clear_cached_miners(self) -> None:
"""Clear the miner factory cache."""
# empty out self.miners
self.miners = {}
async def _get_miner_type(
self, ip: Union[ipaddress.ip_address, str]
) -> Tuple[Union[str, None], Union[str, None], Union[str, None], Union[str, None]]:
model, api, ver, api_ver = None, None, None, None
try:
devdetails, version = await self.__get_devdetails_and_version(ip)
except APIError as e:
# catch APIError and let the factory know we cant get data
logging.warning(f"{ip}: API Command Error: {e}")
return None, None, None, None
except OSError or ConnectionRefusedError:
# miner refused connection on API port, we wont be able to get data this way
# try ssh
try:
_model = await self.__get_model_from_graphql(ip)
if not _model:
_model = await self.__get_model_from_ssh(ip)
if _model:
model = _model
api = "BOSMiner+"
except asyncssh.misc.PermissionDenied:
try:
data = await self.__get_system_info_from_web(ip)
if not data.get("success"):
_model = await self.__get_dragonmint_version_from_web(ip)
if _model:
model = _model
if "minertype" in data:
model = data["minertype"].upper()
if "bmminer" in "\t".join(data):
api = "BMMiner"
except Exception as e:
logging.debug(f"Unable to get miner - {e}")
return model, api, ver, api_ver
# if we have devdetails, we can get model data from there
if devdetails:
for _devdetails_key in ["Model", "Driver"]:
try:
if devdetails.get("DEVDETAILS"):
model = devdetails["DEVDETAILS"][0][_devdetails_key].upper()
if "NOPIC" in model:
# bos, weird model
if model == "ANTMINER S19J88NOPIC":
model = "ANTMINER S19J NOPIC"
else:
print(model)
if not model == "BITMICRO":
break
elif devdetails.get("DEVS"):
model = devdetails["DEVS"][0][_devdetails_key].upper()
if "QOMO" in model:
model = await self.__get_goldshell_model_from_web(ip)
except LookupError:
continue
try:
if devdetails[0]["STATUS"][0]["Msg"]:
model = await self.__get_model_from_graphql(ip)
if model:
api = "BOSMiner+"
except (KeyError, TypeError, ValueError, IndexError):
pass
try:
if not model:
# braiins OS bug check just in case
if "s9" in devdetails["STATUS"][0]["Description"]:
model = "ANTMINER S9"
if "s17" in version["STATUS"][0]["Description"]:
model = "ANTMINER S17"
except (KeyError, TypeError, ValueError, IndexError):
pass
try:
if not api:
if "boser" in version["STATUS"][0]["Description"]:
api = "BOSMiner+"
except (KeyError, TypeError, ValueError, IndexError):
pass
else:
try:
_model = await self.__get_model_from_graphql(ip)
if _model:
model = _model
api = "BOSMiner+"
except (KeyError, TypeError, ValueError, IndexError):
pass
# if we have version we can get API type from here
if version:
try:
if isinstance(version.get("Msg"), dict):
if "api_ver" in version["Msg"]:
api_ver = (
version["Msg"]["api_ver"]
.replace("whatsminer ", "")
.replace("v", "")
)
api = "BTMiner"
if version[0]["STATUS"][0]["Msg"]:
model = await self.__get_model_from_graphql(ip)
if model:
api = "BOSMiner+"
try:
api_ver = version[0]["VERSION"][0]["API"]
except (KeyError, TypeError, ValueError, IndexError):
pass
return model, api, ver, api_ver
except (KeyError, TypeError, ValueError, IndexError):
pass
if "VERSION" in version:
api_ver = version["VERSION"][0].get("API")
api_types = ["BMMiner", "CGMiner", "BTMiner"]
# check basic API types, BOSMiner needs a special check
for api_type in api_types:
if any(api_type in string for string in version["VERSION"][0]):
api = api_type
# check if there are any BOSMiner strings in any of the dict keys
if any("BOSminer" in string for string in version["VERSION"][0]):
api = "BOSMiner"
if version["VERSION"][0].get("BOSminer"):
if "plus" in version["VERSION"][0]["BOSminer"]:
api = "BOSMiner+"
if "BOSminer+" in version["VERSION"][0]:
api = "BOSMiner+"
if any("BOSer" in string for string in version["VERSION"][0]):
api = "BOSMiner+"
# check for avalonminers
for _version_key in ["PROD", "MODEL"]:
try:
_data = version["VERSION"][0][_version_key].split("-")
except KeyError:
continue
model = _data[0].upper()
if _version_key == "MODEL":
model = f"AVALONMINER {_data[0]}"
if len(_data) > 1:
ver = _data[1]
if version.get("Description") and (
"whatsminer" in version.get("Description")
):
api = "BTMiner"
# if we have no model from devdetails but have version, try to get it from there
if version and not model:
try:
model = version["VERSION"][0]["Type"].upper()
if "ANTMINER BHB" in model:
# def antminer, get from web
sysinfo = await self.__get_system_info_from_web(str(ip))
model = sysinfo["minertype"].upper()
if "VNISH" in model:
api = "VNish"
for split_point in [" BB", " XILINX", " (VNISH"]:
if split_point in model:
model = model.split(split_point)[0]
except KeyError:
pass
if not model:
stats = await self._send_api_command(str(ip), "stats")
if stats:
try:
_model = stats["STATS"][0]["Type"].upper()
except KeyError:
pass
else:
if "VNISH" in _model:
api = "VNish"
for split_point in [" BB", " XILINX", " (VNISH"]:
if split_point in _model:
_model = _model.split(split_point)[0]
if "PRO" in _model and " PRO" not in _model:
_model = _model.replace("PRO", " PRO")
model = _model
else:
_model = await self.__get_dragonmint_version_from_web(ip)
if _model:
model = _model
if model:
if "DRAGONMINT" in model or "A10" in model:
_model = await self.__get_dragonmint_version_from_web(ip)
if _model:
model = _model
if " HIVEON" in model:
# do hiveon check before whatsminer as HIVEON contains a V
model = model.split(" HIVEON")[0]
api = "Hiveon"
# whatsminer have a V in their version string (M20SV41), everything after it is ver
if "V" in model:
_ver = model.split("V")
if len(_ver) > 1:
ver = model.split("V")[1]
model = model.split("V")[0]
# don't need "Bitmain", just "ANTMINER XX" as model
if "BITMAIN " in model:
model = model.replace("BITMAIN ", "")
return model, api, ver, api_ver
async def __get_devdetails_and_version(
self, ip
) -> Tuple[Union[dict, None], Union[dict, None]]:
version = None
try:
# get device details and version data
data = await self._send_api_command(str(ip), "devdetails+version")
# validate success
validation = await self._validate_command(data)
if not validation[0]:
try:
if data["version"][0]["STATUS"][0]["Msg"] == "Disconnected":
return data["devdetails"], data["version"]
except (KeyError, TypeError):
pass
raise APIError(validation[1])
# copy each part of the main command to devdetails and version
devdetails = data["devdetails"][0]
version = data["version"][0]
if "STATUS" in version:
if len(version["STATUS"]) > 0:
if "Description" in version["STATUS"][0]:
if version["STATUS"][0]["Description"] == "btminer":
try:
new_version = await self._send_api_command(
str(ip), "get_version"
)
validation = await self._validate_command(new_version)
if validation[0]:
version = new_version
except Exception as e:
logging.warning(
f"([Hidden] Get Devdetails and Version) - Error {e}"
)
if "DEVDETAILS" in devdetails:
if len(devdetails["DEVDETAILS"]) > 0:
if devdetails["DEVDETAILS"][0].get("Driver") == "bitmicro":
try:
new_version = await self._send_api_command(
str(ip), "get_version"
)
validation = await self._validate_command(new_version)
if validation[0]:
version = new_version
except Exception as e:
logging.warning(
f"([Hidden] Get Devdetails and Version) - Error {e}"
)
return devdetails, version
except APIError:
# try devdetails and version separately (X19s mainly require this)
# get devdetails and validate
devdetails = await self._send_api_command(str(ip), "devdetails")
validation = await self._validate_command(devdetails)
if not validation[0]:
# if devdetails fails try version instead
devdetails = None
# get version and validate
version = await self._send_api_command(str(ip), "version")
validation = await self._validate_command(version)
if not validation[0]:
# finally try get_version (Whatsminers) and validate
version = await self._send_api_command(str(ip), "get_version")
validation = await self._validate_command(version)
# if this fails we raise an error to be caught below
if not validation[0]:
raise APIError(validation[1])
return devdetails, version
@staticmethod
async def __get_model_from_ssh(ip: ipaddress.ip_address) -> Union[str, None]:
model = None
try:
async with asyncssh.connect(
str(ip),
known_hosts=None,
username="root",
password="admin",
server_host_key_algs=["ssh-rsa"],
) as conn:
board_name = None
cmd = await conn.run("cat /tmp/sysinfo/board_name")
if cmd:
board_name = cmd.stdout.strip()
if board_name == "am1-s9":
model = "ANTMINER S9"
if board_name == "am2-s17":
model = "ANTMINER S17"
return model
except ConnectionRefusedError:
return None
@staticmethod
async def __get_model_from_graphql(ip: ipaddress.ip_address) -> Union[str, None]:
model = None
url = f"http://{ip}/graphql"
try:
async with httpx.AsyncClient() as client:
d = await client.post(
url, json={"query": "{bosminer {info{modelName}}}"}
)
if d.status_code == 200:
model = (d.json()["data"]["bosminer"]["info"]["modelName"]).upper()
return model
except httpx.HTTPError:
pass
@staticmethod
async def __get_system_info_from_web(ip) -> dict:
url = f"http://{ip}/cgi-bin/get_system_info.cgi"
auth = httpx.DigestAuth("root", "root")
try:
async with httpx.AsyncClient() as client:
data = await client.get(url, auth=auth)
if data.status_code == 200:
data = data.json()
return data
except httpx.HTTPError:
pass
@staticmethod
async def __get_goldshell_model_from_web(ip):
response = None
try:
async with httpx.AsyncClient() as client:
response = (
await client.get(
f"http://{ip}/mcb/status",
)
).json()
except httpx.HTTPError as e:
logging.info(e)
if response:
try:
model = response["model"]
if model:
return model.replace("-", " ").upper()
except KeyError:
pass
@staticmethod
async def __get_dragonmint_version_from_web(
ip: ipaddress.ip_address,
) -> Union[str, None]:
response = None
try:
async with httpx.AsyncClient() as client:
auth = (
await client.post(
f"http://{ip}/api/auth",
data={"username": "admin", "password": "admin"},
)
).json()["jwt"]
response = (
await client.post(
f"http://{ip}/api/type",
headers={"Authorization": "Bearer " + auth},
data={},
)
).json()
except httpx.HTTPError as e:
logging.info(e)
if response:
try:
return response["type"]
except KeyError:
pass
@staticmethod
async def _validate_command(data: dict) -> Tuple[bool, Union[str, None]]:
"""Check if the returned command output is correctly formatted."""
# check if the data returned is correct or an error
if not data or data == {}:
return False, "No API data."
# if status isn't a key, it is a multicommand
if "STATUS" not in data.keys():
for key in data.keys():
# make sure not to try to turn id into a dict
if not key == "id":
# make sure they succeeded
if "STATUS" in data[key][0].keys():
if data[key][0]["STATUS"][0]["STATUS"] not in ["S", "I"]:
# this is an error
return False, f"{key}: " + data[key][0]["STATUS"][0]["Msg"]
elif "id" not in data.keys():
if data["STATUS"] not in ["S", "I"]:
return False, data["Msg"]
else:
# make sure the command succeeded
if data["STATUS"][0]["STATUS"] not in ("S", "I"):
return False, data["STATUS"][0]["Msg"]
return True, None
@staticmethod
async def _send_api_command(
ip: Union[ipaddress.ip_address, str], command: str
) -> dict:
try:
# get reader and writer streams
reader, writer = await asyncio.open_connection(str(ip), 4028)
except OSError as e:
if e.errno in [10061, 22]:
raise e
logging.warning(f"{str(ip)} - Command {command}: {e}")
return {}
# create the command
cmd = {"command": command}
# send the command
writer.write(json.dumps(cmd).encode("utf-8"))
await writer.drain()
# instantiate data
data = b""
# loop to receive all the data
try:
while True:
d = await reader.read(4096)
if not d:
break
data += d
except Exception as e:
logging.debug(f"{str(ip)}: {e}")
try:
# some json from the API returns with a null byte (\x00) on the end
if data.endswith(b"\x00"):
# handle the null byte
str_data = data.decode("utf-8")[:-1]
else:
# no null byte
str_data = data.decode("utf-8")
# fix an error with a btminer return having an extra comma that breaks json.loads()
str_data = str_data.replace(",}", "}")
# fix an error with a btminer return having a newline that breaks json.loads()
str_data = str_data.replace("\n", "")
# fix an error with a bmminer return missing a specific comma that breaks json.loads()
str_data = str_data.replace("}{", "},{")
# parse the json
data = json.loads(str_data)
# handle bad json
except json.decoder.JSONDecodeError:
# raise APIError(f"Decode Error: {data}")
data = None
# close the connection
writer.close()
await writer.wait_closed()
return data
FACTORY = MinerFactory()

View File

@@ -16,7 +16,7 @@
import unittest
from tests.miners_tests import MinerFactoryTest, MinersTest
# from tests.miners_tests import MinerFactoryTest, MinersTest
from tests.network_tests import NetworkTest
if __name__ == "__main__":

View File

@@ -23,158 +23,157 @@ from pyasic.miners.backends import CGMiner # noqa
from pyasic.miners.base import BaseMiner
from pyasic.miners.miner_factory import MINER_CLASSES, MinerFactory
class MinersTest(unittest.TestCase):
def test_miner_model_creation(self):
warnings.filterwarnings("ignore")
for miner_model in MINER_CLASSES.keys():
for miner_api in MINER_CLASSES[miner_model].keys():
with self.subTest(
msg=f"Creation of miner using model={miner_model}, api={miner_api}",
miner_model=miner_model,
miner_api=miner_api,
):
miner = MINER_CLASSES[miner_model][miner_api]("127.0.0.1")
self.assertTrue(
isinstance(miner, MINER_CLASSES[miner_model][miner_api])
)
def test_miner_backend_backup_creation(self):
warnings.filterwarnings("ignore")
backends = [
list(
inspect.getmembers(
sys.modules[f"pyasic.miners.backends"], inspect.isclass
)
)
]
backends = [item for sublist in backends for item in sublist]
for backend in backends:
miner_class = backend[1]
with self.subTest(miner_class=miner_class):
miner = miner_class("127.0.0.1")
self.assertTrue(isinstance(miner, miner_class))
def test_miner_type_creation_failure(self):
warnings.filterwarnings("ignore")
backends = [
list(
inspect.getmembers(
sys.modules[f"pyasic.miners.{algo}._types"], inspect.isclass
)
)
for algo in ["btc", "zec", "ltc"]
]
backends = [item for sublist in backends for item in sublist]
for backend in backends:
miner_class = backend[1]
with self.subTest(miner_class=miner_class):
with self.assertRaises(TypeError):
miner_class("127.0.0.1")
with self.assertRaises(TypeError):
BaseMiner("127.0.0.1")
def test_miner_comparisons(self):
miner_1 = CGMiner("1.1.1.1")
miner_2 = CGMiner("2.2.2.2")
miner_3 = CGMiner("1.1.1.1")
self.assertEqual(miner_1, miner_3)
self.assertGreater(miner_2, miner_1)
self.assertLess(miner_3, miner_2)
class MinerFactoryTest(unittest.TestCase):
def test_miner_factory_creation(self):
warnings.filterwarnings("ignore")
self.assertDictEqual(MinerFactory().miners, {})
miner_factory = MinerFactory()
self.assertIs(MinerFactory(), miner_factory)
def test_get_miner_generator(self):
async def _coro():
gen = MinerFactory().get_miner_generator([])
miners = []
async for miner in gen:
miners.append(miner)
return miners
_miners = asyncio.run(_coro())
self.assertListEqual(_miners, [])
def test_miner_selection(self):
warnings.filterwarnings("ignore")
for miner_model in MINER_CLASSES.keys():
with self.subTest():
miner = MinerFactory()._select_miner_from_classes(
"127.0.0.1", miner_model, None, None
)
self.assertIsInstance(miner, BaseMiner)
for api in ["BOSMiner+", "BOSMiner", "CGMiner", "BTMiner", "BMMiner"]:
with self.subTest():
miner = MinerFactory()._select_miner_from_classes(
"127.0.0.1", None, api, None
)
self.assertIsInstance(miner, BaseMiner)
with self.subTest():
miner = MinerFactory()._select_miner_from_classes(
"127.0.0.1", "ANTMINER S17+", "Fake API", None
)
self.assertIsInstance(miner, BaseMiner)
with self.subTest():
miner = MinerFactory()._select_miner_from_classes(
"127.0.0.1", "M30S", "BTMiner", "G20"
)
self.assertIsInstance(miner, BaseMiner)
def test_validate_command(self):
bad_test_data_returns = [
{},
{
"cmd": [
{
"STATUS": [
{"STATUS": "E", "Msg": "Command failed for some reason."}
]
}
]
},
{"STATUS": "E", "Msg": "Command failed for some reason."},
{
"STATUS": [{"STATUS": "E", "Msg": "Command failed for some reason."}],
"id": 1,
},
]
for data in bad_test_data_returns:
with self.subTest():
async def _coro(miner_ret):
_data = await MinerFactory()._validate_command(miner_ret)
return _data
ret = asyncio.run(_coro(data))
self.assertFalse(ret[0])
good_test_data_returns = [
{
"STATUS": [{"STATUS": "S", "Msg": "Yay! Command succeeded."}],
"id": 1,
},
]
for data in good_test_data_returns:
with self.subTest():
async def _coro(miner_ret):
_data = await MinerFactory()._validate_command(miner_ret)
return _data
ret = asyncio.run(_coro(data))
self.assertTrue(ret[0])
# class MinersTest(unittest.TestCase):
# def test_miner_model_creation(self):
# warnings.filterwarnings("ignore")
# for miner_model in MINER_CLASSES.keys():
# for miner_api in MINER_CLASSES[miner_model].keys():
# with self.subTest(
# msg=f"Creation of miner using model={miner_model}, api={miner_api}",
# miner_model=miner_model,
# miner_api=miner_api,
# ):
# miner = MINER_CLASSES[miner_model][miner_api]("127.0.0.1")
# self.assertTrue(
# isinstance(miner, MINER_CLASSES[miner_model][miner_api])
# )
#
# def test_miner_backend_backup_creation(self):
# warnings.filterwarnings("ignore")
#
# backends = [
# list(
# inspect.getmembers(
# sys.modules[f"pyasic.miners.backends"], inspect.isclass
# )
# )
# ]
# backends = [item for sublist in backends for item in sublist]
# for backend in backends:
# miner_class = backend[1]
# with self.subTest(miner_class=miner_class):
# miner = miner_class("127.0.0.1")
# self.assertTrue(isinstance(miner, miner_class))
#
# def test_miner_type_creation_failure(self):
# warnings.filterwarnings("ignore")
#
# backends = [
# list(
# inspect.getmembers(
# sys.modules[f"pyasic.miners.{algo}._types"], inspect.isclass
# )
# )
# for algo in ["btc", "zec", "ltc"]
# ]
# backends = [item for sublist in backends for item in sublist]
# for backend in backends:
# miner_class = backend[1]
# with self.subTest(miner_class=miner_class):
# with self.assertRaises(TypeError):
# miner_class("127.0.0.1")
# with self.assertRaises(TypeError):
# BaseMiner("127.0.0.1")
#
# def test_miner_comparisons(self):
# miner_1 = CGMiner("1.1.1.1")
# miner_2 = CGMiner("2.2.2.2")
# miner_3 = CGMiner("1.1.1.1")
# self.assertEqual(miner_1, miner_3)
# self.assertGreater(miner_2, miner_1)
# self.assertLess(miner_3, miner_2)
#
#
# class MinerFactoryTest(unittest.TestCase):
# def test_miner_factory_creation(self):
# warnings.filterwarnings("ignore")
#
# self.assertDictEqual(MinerFactory().miners, {})
# miner_factory = MinerFactory()
# self.assertIs(MinerFactory(), miner_factory)
#
# def test_get_miner_generator(self):
# async def _coro():
# gen = MinerFactory().get_miner_generator([])
# miners = []
# async for miner in gen:
# miners.append(miner)
# return miners
#
# _miners = asyncio.run(_coro())
# self.assertListEqual(_miners, [])
#
# def test_miner_selection(self):
# warnings.filterwarnings("ignore")
#
# for miner_model in MINER_CLASSES.keys():
# with self.subTest():
# miner = MinerFactory()._select_miner_from_classes(
# "127.0.0.1", miner_model, None, None
# )
# self.assertIsInstance(miner, BaseMiner)
# for api in ["BOSMiner+", "BOSMiner", "CGMiner", "BTMiner", "BMMiner"]:
# with self.subTest():
# miner = MinerFactory()._select_miner_from_classes(
# "127.0.0.1", None, api, None
# )
# self.assertIsInstance(miner, BaseMiner)
#
# with self.subTest():
# miner = MinerFactory()._select_miner_from_classes(
# "127.0.0.1", "ANTMINER S17+", "Fake API", None
# )
# self.assertIsInstance(miner, BaseMiner)
#
# with self.subTest():
# miner = MinerFactory()._select_miner_from_classes(
# "127.0.0.1", "M30S", "BTMiner", "G20"
# )
# self.assertIsInstance(miner, BaseMiner)
#
# def test_validate_command(self):
# bad_test_data_returns = [
# {},
# {
# "cmd": [
# {
# "STATUS": [
# {"STATUS": "E", "Msg": "Command failed for some reason."}
# ]
# }
# ]
# },
# {"STATUS": "E", "Msg": "Command failed for some reason."},
# {
# "STATUS": [{"STATUS": "E", "Msg": "Command failed for some reason."}],
# "id": 1,
# },
# ]
# for data in bad_test_data_returns:
# with self.subTest():
#
# async def _coro(miner_ret):
# _data = await MinerFactory()._validate_command(miner_ret)
# return _data
#
# ret = asyncio.run(_coro(data))
# self.assertFalse(ret[0])
#
# good_test_data_returns = [
# {
# "STATUS": [{"STATUS": "S", "Msg": "Yay! Command succeeded."}],
# "id": 1,
# },
# ]
# for data in good_test_data_returns:
# with self.subTest():
#
# async def _coro(miner_ret):
# _data = await MinerFactory()._validate_command(miner_ret)
# return _data
#
# ret = asyncio.run(_coro(data))
# self.assertTrue(ret[0])
if __name__ == "__main__":