feature: refactor to aiohttp and fix a lot of bugs with factory. Still needs support for some miners.

This commit is contained in:
UpstreamData
2023-06-07 11:37:52 -06:00
parent d6a42238d0
commit c56686a18d
3 changed files with 330 additions and 139 deletions

View File

@@ -18,9 +18,9 @@ import ipaddress
from typing import Union from typing import Union
from pyasic.miners.base import AnyMiner, BaseMiner from pyasic.miners.base import AnyMiner, BaseMiner
from pyasic.miners.miner_factory import MinerFactory from pyasic.miners.miner_factory import miner_factory
# abstracted version of get miner that is easier to access # abstracted version of get miner that is easier to access
async def get_miner(ip: Union[ipaddress.ip_address, str]) -> AnyMiner: async def get_miner(ip: Union[ipaddress.ip_address, str]) -> AnyMiner:
return await MinerFactory().get_miner(ip) return await miner_factory.get_miner(ip)

View File

@@ -16,22 +16,18 @@
import asyncio import asyncio
import enum
import ipaddress import ipaddress
import json import json
import re import re
from ipaddress import IPv4Address from ipaddress import IPv4Address
from typing import Optional, Tuple, Union from typing import Callable, List, Optional, Tuple, Union
import httpx import aiohttp
from pyasic.logger import logger from pyasic.logger import logger
from pyasic.miners.backends import BOSMiner # noqa - Ignore _module import from pyasic.miners.backends import BFGMiner, BMMiner, BOSMiner, BTMiner, CGMiner, VNish
from pyasic.miners.backends import CGMiner # noqa - Ignore _module import from pyasic.miners.backends.bosminer_old import BOSMinerOld
from pyasic.miners.backends.bmminer import BMMiner # noqa - Ignore _module import
from pyasic.miners.backends.bosminer_old import ( # noqa - Ignore _module import
BOSMinerOld,
)
from pyasic.miners.backends.btminer import BTMiner # noqa - Ignore _module import
from pyasic.miners.base import AnyMiner from pyasic.miners.base import AnyMiner
from pyasic.miners.btc import * from pyasic.miners.btc import *
from pyasic.miners.ckb import * from pyasic.miners.ckb import *
@@ -44,9 +40,8 @@ from pyasic.miners.ltc import *
from pyasic.miners.unknown import UnknownMiner from pyasic.miners.unknown import UnknownMiner
from pyasic.miners.zec import * from pyasic.miners.zec import *
TIMEOUT = 30 TIMEOUT = 20
LOOPS = 1 RETRIES = 3
MINER_CLASSES = { MINER_CLASSES = {
"ANTMINER DR5": { "ANTMINER DR5": {
@@ -591,123 +586,230 @@ MINER_CLASSES = {
} }
# TODO: Implement caching and cache clearing. class MinerTypes(enum.Enum):
# TODO: Add Canaan support back ANTMINER = 0
# TODO: Improve consistency WHATSMINER = 1
class MinerFactory: AVALONMINER = 2
async def web_ping(self, ip: str): INNOSILICON = 3
tasks = [self._http_ping(ip), self._http_ping(ip, https=True)] GOLDSHELL = 4
d = asyncio.as_completed( BRAIINS_OS = 5
tasks, VNISH = 6
timeout=TIMEOUT, HIVEON = 7
)
for i in d:
try:
data = await i
if data[0] is not None:
if not "400 - Bad Request" in data[0]:
return data
except asyncio.TimeoutError:
pass
return None, False
async def _http_ping(
self, ip: str, https: bool = False async def concurrent_get_first_result(tasks: list, verification_func: Callable):
) -> Tuple[Optional[str], bool]: while True:
request = "GET / HTTP/1.1\r\nHost: pyasic\r\n\r\n" await asyncio.sleep(0)
if https: if len(tasks) == 0:
request = "GET / HTTPS/1.1\r\nHost: pyasic\r\n\r\n" return
for task in tasks:
if task.done():
try: try:
reader, writer = await asyncio.open_connection(str(ip), 80) result = await task
response = None
try:
writer.write(request.encode())
response = await reader.read()
except asyncio.CancelledError: except asyncio.CancelledError:
writer.close() for t in tasks:
await writer.wait_closed() t.cancel()
if response is not None: raise
data = response.decode()
if data is not None and not data == "":
return data, True
else: else:
writer.close() if not verification_func(result):
await writer.wait_closed() continue
data = response.decode() for t in tasks:
if data is not None and not data == "": t.cancel()
return data, True return result
except OSError:
pass
return None, False
async def sock_ping(self, ip: str) -> [Optional[dict], bool]:
class MinerFactory:
async def get_multiple_miners(self, ips: List[str], limit: int = 200):
tasks = []
results = []
semaphore = asyncio.Semaphore(limit)
for ip in ips:
tasks.append(asyncio.create_task(self.get_miner(ip)))
for task in tasks:
await semaphore.acquire()
try: try:
data = await self.send_api_command(ip, "devdetails") result = await task
if data: if result is not None:
return data, True results.append(result)
except (asyncio.exceptions.TimeoutError, OSError, ConnectionError): finally:
pass semaphore.release()
return None, False
return results
async def get_miner(self, ip: str): async def get_miner(self, ip: str):
miner_type = None
for _ in range(RETRIES):
task = asyncio.create_task(self._get_miner_type(ip))
try: try:
return await asyncio.wait_for(self._get_miner(ip), TIMEOUT) miner_type = await asyncio.wait_for(task, timeout=TIMEOUT)
except asyncio.TimeoutError: except asyncio.TimeoutError:
return None task.cancel()
else:
async def _get_miner(self, ip: str): if miner_type is not None:
sock_data = None
web_data = None
for i in range(LOOPS):
web_result, sock_result = await asyncio.gather(
self.web_ping(ip), self.sock_ping(ip)
)
online = sock_result[1] or web_result[1]
if online:
web_data = web_result[0]
sock_data = sock_result[0]
break break
if web_data: if miner_type is not None:
if "401 Unauthorized" and 'realm="antMiner' in web_data: if miner_type == MinerTypes.ANTMINER:
# antminer branch
return await self.get_miner_antminer(ip) return await self.get_miner_antminer(ip)
if "307 Temporary Redirect" and 'location="https://' in web_data: if miner_type == MinerTypes.WHATSMINER:
return await self.get_miner_whatsminer(ip) return await self.get_miner_whatsminer(ip)
if "Braiins OS" in web_data: if miner_type == MinerTypes.AVALONMINER:
return "BOS+" return await self.get_miner_avalonminer(ip)
if "cloud-box" in web_data: if miner_type == MinerTypes.INNOSILICON:
# goldshell branch return await self.get_miner_innosilicon(ip)
if miner_type == MinerTypes.GOLDSHELL:
return await self.get_miner_goldshell(ip) return await self.get_miner_goldshell(ip)
if miner_type == MinerTypes.BRAIINS_OS:
return await self.get_miner_braiins_os(ip)
if miner_type == MinerTypes.VNISH:
return await self.get_miner_vnish(ip)
if miner_type == MinerTypes.HIVEON:
return await self.get_miner_hiveon(ip)
if sock_data: async def _get_miner_type(self, ip: str):
if "bitmicro" in str(sock_data): tasks = [
return await self.get_miner_whatsminer(ip, sock_data) asyncio.create_task(self._get_miner_web(ip)),
if "intchains_qomo" in str(sock_data): asyncio.create_task(self._get_miner_socket(ip)),
return await self.get_miner_goldshell(ip) ]
return UnknownMiner(ip)
return await concurrent_get_first_result(tasks, lambda x: x is not None)
async def _get_miner_web(self, ip: str):
urls = [f"http://{ip}/", f"https://{ip}/"]
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(self._web_ping(session, url)) for url in urls]
text, resp = await concurrent_get_first_result(
tasks, lambda x: x[0] is not None
)
if text is not None:
return self._parse_web_type(text, resp)
async def _get_miner_socket(self, ip: str):
commands = ["devdetails", "version"]
tasks = [asyncio.create_task(self._socket_ping(ip, cmd)) for cmd in commands]
data = await concurrent_get_first_result(
tasks, lambda x: x is not None and self._parse_socket_type(x) is not None
)
if data is not None:
return self._parse_socket_type(data)
def _parse_web_type(
self, web_text: str, web_resp: aiohttp.ClientResponse
) -> MinerTypes:
if web_resp.status == 401 and 'realm="antMiner' in web_resp.headers.get(
"www-authenticate", ""
):
return MinerTypes.ANTMINER
if web_resp.status == 307 and "https://" in web_resp.headers.get(
"location", ""
):
return MinerTypes.WHATSMINER
if "Braiins OS" in web_text or 'href="/cgi-bin/luci"' in web_text:
return MinerTypes.BRAIINS_OS
if "cloud-box" in web_text:
return MinerTypes.GOLDSHELL
if "AnthillOS" in web_text:
return MinerTypes.VNISH
def _parse_socket_type(self, data: str) -> MinerTypes:
upper_data = data.upper()
if "BOSMINER" in upper_data or "BOSER" in upper_data:
return MinerTypes.BRAIINS_OS
if "BTMINER" in upper_data or "BITMICRO" in upper_data:
return MinerTypes.WHATSMINER
if "VNISH" in upper_data:
return MinerTypes.VNISH
if "HIVEON" in upper_data:
return MinerTypes.HIVEON
if "ANTMINER" in upper_data:
return MinerTypes.ANTMINER
if "INTCHAINS_QOMO" in upper_data:
return MinerTypes.GOLDSHELL
async def _web_ping(
self, session: aiohttp.ClientSession, url: str
) -> Tuple[Optional[str], Optional[aiohttp.ClientResponse]]:
try:
resp = await session.get(url)
return await resp.text(), resp
except aiohttp.ClientError:
pass
return None, None
async def _socket_ping(self, ip: str, cmd: str) -> Optional[str]:
data = b""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(str(ip), 4028), timeout=30
)
except (ConnectionError, OSError, asyncio.TimeoutError):
return
cmd = {"command": cmd}
try:
# send the command
writer.write(json.dumps(cmd).encode("utf-8"))
await writer.drain()
# loop to receive all the data
while True:
try:
d = await asyncio.wait_for(reader.read(4096), timeout=1)
if not d:
break
data += d
except asyncio.TimeoutError:
pass
except ConnectionResetError:
return
except asyncio.CancelledError:
raise
except (ConnectionError, OSError):
return
finally:
# Handle cancellation explicitly
if writer.transport.is_closing():
writer.transport.close()
else:
writer.close()
try:
await writer.wait_closed()
except (ConnectionError, OSError):
return
if data:
return data.decode("utf-8")
async def send_web_command( async def send_web_command(
self, self,
ip: Union[ipaddress.ip_address, str], ip: Union[ipaddress.ip_address, str],
location: str, location: str,
auth: Optional[httpx.DigestAuth] = None, auth: Optional[aiohttp.BasicAuth] = None,
) -> Optional[dict]: ) -> Optional[dict]:
async with httpx.AsyncClient(verify=False, timeout=TIMEOUT) as client: async with aiohttp.ClientSession() as session:
try: try:
data = await client.get( data = await session.get(
f"http://{str(ip)}{location}", f"http://{str(ip)}{location}",
auth=auth, auth=auth,
timeout=TIMEOUT, timeout=30,
) )
except httpx.HTTPError: except (aiohttp.ClientError, asyncio.TimeoutError):
logger.info(f"{ip}: Web command timeout.") logger.info(f"{ip}: Web command timeout.")
return return
if data is None: if data is None:
return return
try: try:
json_data = data.json() json_data = await data.json()
except json.JSONDecodeError: except (aiohttp.ContentTypeError, asyncio.TimeoutError):
try:
return json.loads(await data.text())
except (json.JSONDecodeError, aiohttp.ClientError):
return return
else: else:
return json_data return json_data
@@ -715,21 +817,18 @@ class MinerFactory:
async def send_api_command( async def send_api_command(
self, ip: Union[ipaddress.ip_address, str], command: str self, ip: Union[ipaddress.ip_address, str], command: str
) -> Optional[dict]: ) -> Optional[dict]:
data = b""
try: try:
reader, writer = await asyncio.wait_for( reader, writer = await asyncio.open_connection(str(ip), 4028)
asyncio.open_connection(str(ip), 4028), timeout=TIMEOUT
)
except (ConnectionError, OSError): except (ConnectionError, OSError):
return return
cmd = {"command": command} cmd = {"command": command}
try:
# send the command # send the command
writer.write(json.dumps(cmd).encode("utf-8")) writer.write(json.dumps(cmd).encode("utf-8"))
await writer.drain() await writer.drain()
# instantiate data
data = b""
# loop to receive all the data # loop to receive all the data
while True: while True:
d = await reader.read(4096) d = await reader.read(4096)
@@ -739,6 +838,12 @@ class MinerFactory:
writer.close() writer.close()
await writer.wait_closed() await writer.wait_closed()
except asyncio.CancelledError:
writer.close()
await writer.wait_closed()
return
except (ConnectionError, OSError):
return
data = await self.fix_api_data(data) data = await self.fix_api_data(data)
@@ -802,7 +907,7 @@ class MinerFactory:
pass pass
# last resort, this is slow # last resort, this is slow
auth = httpx.DigestAuth("root", "root") auth = aiohttp.BasicAuth("root", "root")
web_json_data = await self.send_web_command( web_json_data = await self.send_web_command(
ip, "/cgi-bin/get_system_info.cgi", auth=auth ip, "/cgi-bin/get_system_info.cgi", auth=auth
) )
@@ -824,26 +929,111 @@ class MinerFactory:
async def get_miner_goldshell(self, ip: str): async def get_miner_goldshell(self, ip: str):
json_data = await self.send_web_command(ip, "/mcb/status") json_data = await self.send_web_command(ip, "/mcb/status")
if json_data:
if json_data.get("model") is not None: if json_data.get("model") is not None:
miner_type = json_data["model"].replace("-", " ").upper() miner_type = json_data["model"].replace("-", " ").upper()
return self._select_miner_from_classes( return self._select_miner_from_classes(
IPv4Address(ip), miner_type, None, None IPv4Address(ip), miner_type, None, None
) )
return BFGMiner(ip)
async def get_miner_whatsminer(self, ip: str, json_data: Optional[dict] = None): async def get_miner_whatsminer(self, ip: str):
if not json_data:
try: try:
json_data = await self.send_api_command(ip, "devdetails") json_data = await self.send_api_command(ip, "devdetails")
except (asyncio.exceptions.TimeoutError, OSError, ConnectionError): except (asyncio.exceptions.TimeoutError, OSError, ConnectionError):
return None return BTMiner(ip)
try: try:
miner_type, submodel = json_data["DEVDETAILS"][0]["Model"].split("V") miner_type, submodel = json_data["DEVDETAILS"][0]["Model"].split("V")
return self._select_miner_from_classes( return self._select_miner_from_classes(
IPv4Address(ip), miner_type, submodel, None IPv4Address(ip), miner_type, submodel, None
) )
except LookupError: except (LookupError, TypeError):
return None return BTMiner(ip)
async def get_miner_avalonminer(self, ip: str):
return CGMiner(ip)
async def get_miner_innosilicon(self, ip: str):
return CGMiner(ip)
async def get_miner_braiins_os(self, ip: str):
sock_json_data = await self.send_api_command(ip, "devdetails")
api_type = "BOSMiner+"
try:
miner_type = sock_json_data["DEVDETAILS"][0]["Model"]
return self._select_miner_from_classes(
ip=IPv4Address(ip),
model=miner_type.upper(),
api=api_type,
ver=None,
api_ver=None,
)
except (TypeError, LookupError):
pass
try:
async with aiohttp.ClientSession as session:
d = await session.post(
url, json={"query": "{bosminer {info{modelName}}}"}
)
if d.status == 200:
json_data = await d.json()
miner_type = (
json_data["data"]["bosminer"]["info"]["modelName"]
).upper()
return self._select_miner_from_classes(
ip=IPv4Address(ip),
model=miner_type.upper(),
api=api_type,
ver=None,
api_ver=None,
)
except aiohttp.ClientError:
pass
return BOSMiner(ip)
async def get_miner_vnish(self, ip: str):
sock_json_data = await self.send_api_command(ip, "stats")
api_type = "VNish"
try:
miner_type = sock_json_data["STATS"][0]["Type"].upper()
miner_api_ver = None
if " (VNISH" in miner_type:
split_miner_type = miner_type.split(" (VNISH ")
miner_type = split_miner_type[0]
miner_api_ver = split_miner_type[1].replace(")", "")
return self._select_miner_from_classes(
ip=IPv4Address(ip),
model=miner_type.upper(),
api=api_type,
ver=None,
api_ver=miner_api_ver,
)
except (TypeError, LookupError):
pass
return VNish(ip)
async def get_miner_hiveon(self, ip: str):
sock_json_data = await self.send_api_command(ip, "version")
try:
miner_type = sock_json_data["VERSION"][0]["Type"]
api_type = "Hiveon"
api_ver = sock_json_data["VERSION"][0]["API"]
return self._select_miner_from_classes(
ip=IPv4Address(ip),
model=miner_type.upper().replace(" HIVEON", ""),
api=api_type,
ver=None,
api_ver=api_ver,
)
except (TypeError, LookupError):
pass
@staticmethod @staticmethod
def _select_miner_from_classes( def _select_miner_from_classes(
@@ -892,4 +1082,4 @@ class MinerFactory:
return miner return miner
FACTORY = MinerFactory() miner_factory = MinerFactory()

View File

@@ -14,6 +14,7 @@ httpx = "^0.24.0"
passlib = "^1.7.4" passlib = "^1.7.4"
pyaml = "^23.5.9" pyaml = "^23.5.9"
toml = "^0.10.2" toml = "^0.10.2"
aiohttp = "^3.8.4"
[tool.poetry.group.dev] [tool.poetry.group.dev]
optional = true optional = true