diff --git a/pyasic/miners/__init__.py b/pyasic/miners/__init__.py index 773165de..8f72f313 100644 --- a/pyasic/miners/__init__.py +++ b/pyasic/miners/__init__.py @@ -18,9 +18,9 @@ import ipaddress from typing import Union from pyasic.miners.base import AnyMiner, BaseMiner -from pyasic.miners.miner_factory import MinerFactory +from pyasic.miners.miner_factory import miner_factory # abstracted version of get miner that is easier to access async def get_miner(ip: Union[ipaddress.ip_address, str]) -> AnyMiner: - return await MinerFactory().get_miner(ip) + return await miner_factory.get_miner(ip) diff --git a/pyasic/miners/miner_factory.py b/pyasic/miners/miner_factory.py index 307880af..89cc59ab 100644 --- a/pyasic/miners/miner_factory.py +++ b/pyasic/miners/miner_factory.py @@ -16,22 +16,18 @@ import asyncio +import enum import ipaddress import json import re from ipaddress import IPv4Address -from typing import Optional, Tuple, Union +from typing import Callable, List, Optional, Tuple, Union -import httpx +import aiohttp from pyasic.logger import logger -from pyasic.miners.backends import BOSMiner # noqa - Ignore _module import -from pyasic.miners.backends import CGMiner # noqa - Ignore _module import -from pyasic.miners.backends.bmminer import BMMiner # noqa - Ignore _module import -from pyasic.miners.backends.bosminer_old import ( # noqa - Ignore _module import - BOSMinerOld, -) -from pyasic.miners.backends.btminer import BTMiner # noqa - Ignore _module import +from pyasic.miners.backends import BFGMiner, BMMiner, BOSMiner, BTMiner, CGMiner, VNish +from pyasic.miners.backends.bosminer_old import BOSMinerOld from pyasic.miners.base import AnyMiner from pyasic.miners.btc import * from pyasic.miners.ckb import * @@ -44,9 +40,8 @@ from pyasic.miners.ltc import * from pyasic.miners.unknown import UnknownMiner from pyasic.miners.zec import * -TIMEOUT = 30 -LOOPS = 1 - +TIMEOUT = 20 +RETRIES = 3 MINER_CLASSES = { "ANTMINER DR5": { @@ -591,154 +586,264 @@ MINER_CLASSES = { } -# TODO: Implement caching and cache clearing. -# TODO: Add Canaan support back -# TODO: Improve consistency +class MinerTypes(enum.Enum): + ANTMINER = 0 + WHATSMINER = 1 + AVALONMINER = 2 + INNOSILICON = 3 + GOLDSHELL = 4 + BRAIINS_OS = 5 + VNISH = 6 + HIVEON = 7 + + +async def concurrent_get_first_result(tasks: list, verification_func: Callable): + while True: + await asyncio.sleep(0) + if len(tasks) == 0: + return + for task in tasks: + if task.done(): + try: + result = await task + except asyncio.CancelledError: + for t in tasks: + t.cancel() + raise + else: + if not verification_func(result): + continue + for t in tasks: + t.cancel() + return result + + class MinerFactory: - async def web_ping(self, ip: str): - tasks = [self._http_ping(ip), self._http_ping(ip, https=True)] - d = asyncio.as_completed( - tasks, - timeout=TIMEOUT, - ) - for i in d: - try: - data = await i - if data[0] is not None: - if not "400 - Bad Request" in data[0]: - return data - except asyncio.TimeoutError: - pass - return None, False + async def get_multiple_miners(self, ips: List[str], limit: int = 200): + tasks = [] + results = [] - async def _http_ping( - self, ip: str, https: bool = False - ) -> Tuple[Optional[str], bool]: - request = "GET / HTTP/1.1\r\nHost: pyasic\r\n\r\n" - if https: - request = "GET / HTTPS/1.1\r\nHost: pyasic\r\n\r\n" - try: - reader, writer = await asyncio.open_connection(str(ip), 80) - response = None - try: - writer.write(request.encode()) - response = await reader.read() - except asyncio.CancelledError: - writer.close() - await writer.wait_closed() - if response is not None: - data = response.decode() - if data is not None and not data == "": - return data, True - else: - writer.close() - await writer.wait_closed() - data = response.decode() - if data is not None and not data == "": - return data, True - except OSError: - pass - return None, False + semaphore = asyncio.Semaphore(limit) - async def sock_ping(self, ip: str) -> [Optional[dict], bool]: - try: - data = await self.send_api_command(ip, "devdetails") - if data: - return data, True - except (asyncio.exceptions.TimeoutError, OSError, ConnectionError): - pass - return None, False + for ip in ips: + tasks.append(asyncio.create_task(self.get_miner(ip))) + + for task in tasks: + await semaphore.acquire() + try: + result = await task + if result is not None: + results.append(result) + finally: + semaphore.release() + + return results async def get_miner(self, ip: str): - try: - return await asyncio.wait_for(self._get_miner(ip), TIMEOUT) - except asyncio.TimeoutError: - return None + miner_type = None + for _ in range(RETRIES): + task = asyncio.create_task(self._get_miner_type(ip)) + try: + miner_type = await asyncio.wait_for(task, timeout=TIMEOUT) + except asyncio.TimeoutError: + task.cancel() + else: + if miner_type is not None: + break - async def _get_miner(self, ip: str): - sock_data = None - web_data = None - for i in range(LOOPS): - web_result, sock_result = await asyncio.gather( - self.web_ping(ip), self.sock_ping(ip) - ) - online = sock_result[1] or web_result[1] - if online: - web_data = web_result[0] - sock_data = sock_result[0] - break - - if web_data: - if "401 Unauthorized" and 'realm="antMiner' in web_data: - # antminer branch + if miner_type is not None: + if miner_type == MinerTypes.ANTMINER: return await self.get_miner_antminer(ip) - if "307 Temporary Redirect" and 'location="https://' in web_data: + if miner_type == MinerTypes.WHATSMINER: return await self.get_miner_whatsminer(ip) - if "Braiins OS" in web_data: - return "BOS+" - if "cloud-box" in web_data: - # goldshell branch + if miner_type == MinerTypes.AVALONMINER: + return await self.get_miner_avalonminer(ip) + if miner_type == MinerTypes.INNOSILICON: + return await self.get_miner_innosilicon(ip) + if miner_type == MinerTypes.GOLDSHELL: return await self.get_miner_goldshell(ip) + if miner_type == MinerTypes.BRAIINS_OS: + return await self.get_miner_braiins_os(ip) + if miner_type == MinerTypes.VNISH: + return await self.get_miner_vnish(ip) + if miner_type == MinerTypes.HIVEON: + return await self.get_miner_hiveon(ip) - if sock_data: - if "bitmicro" in str(sock_data): - return await self.get_miner_whatsminer(ip, sock_data) - if "intchains_qomo" in str(sock_data): - return await self.get_miner_goldshell(ip) - return UnknownMiner(ip) + async def _get_miner_type(self, ip: str): + tasks = [ + asyncio.create_task(self._get_miner_web(ip)), + asyncio.create_task(self._get_miner_socket(ip)), + ] + + return await concurrent_get_first_result(tasks, lambda x: x is not None) + + async def _get_miner_web(self, ip: str): + urls = [f"http://{ip}/", f"https://{ip}/"] + async with aiohttp.ClientSession() as session: + tasks = [asyncio.create_task(self._web_ping(session, url)) for url in urls] + + text, resp = await concurrent_get_first_result( + tasks, lambda x: x[0] is not None + ) + + if text is not None: + return self._parse_web_type(text, resp) + + async def _get_miner_socket(self, ip: str): + commands = ["devdetails", "version"] + tasks = [asyncio.create_task(self._socket_ping(ip, cmd)) for cmd in commands] + + data = await concurrent_get_first_result( + tasks, lambda x: x is not None and self._parse_socket_type(x) is not None + ) + if data is not None: + return self._parse_socket_type(data) + + def _parse_web_type( + self, web_text: str, web_resp: aiohttp.ClientResponse + ) -> MinerTypes: + if web_resp.status == 401 and 'realm="antMiner' in web_resp.headers.get( + "www-authenticate", "" + ): + return MinerTypes.ANTMINER + if web_resp.status == 307 and "https://" in web_resp.headers.get( + "location", "" + ): + return MinerTypes.WHATSMINER + if "Braiins OS" in web_text or 'href="/cgi-bin/luci"' in web_text: + return MinerTypes.BRAIINS_OS + if "cloud-box" in web_text: + return MinerTypes.GOLDSHELL + if "AnthillOS" in web_text: + return MinerTypes.VNISH + + def _parse_socket_type(self, data: str) -> MinerTypes: + upper_data = data.upper() + if "BOSMINER" in upper_data or "BOSER" in upper_data: + return MinerTypes.BRAIINS_OS + if "BTMINER" in upper_data or "BITMICRO" in upper_data: + return MinerTypes.WHATSMINER + if "VNISH" in upper_data: + return MinerTypes.VNISH + if "HIVEON" in upper_data: + return MinerTypes.HIVEON + if "ANTMINER" in upper_data: + return MinerTypes.ANTMINER + if "INTCHAINS_QOMO" in upper_data: + return MinerTypes.GOLDSHELL + + async def _web_ping( + self, session: aiohttp.ClientSession, url: str + ) -> Tuple[Optional[str], Optional[aiohttp.ClientResponse]]: + try: + resp = await session.get(url) + return await resp.text(), resp + except aiohttp.ClientError: + pass + return None, None + + async def _socket_ping(self, ip: str, cmd: str) -> Optional[str]: + data = b"" + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(str(ip), 4028), timeout=30 + ) + except (ConnectionError, OSError, asyncio.TimeoutError): + return + + cmd = {"command": cmd} + + try: + # send the command + writer.write(json.dumps(cmd).encode("utf-8")) + await writer.drain() + + # loop to receive all the data + while True: + try: + d = await asyncio.wait_for(reader.read(4096), timeout=1) + if not d: + break + data += d + except asyncio.TimeoutError: + pass + except ConnectionResetError: + return + except asyncio.CancelledError: + raise + except (ConnectionError, OSError): + return + finally: + # Handle cancellation explicitly + if writer.transport.is_closing(): + writer.transport.close() + else: + writer.close() + try: + await writer.wait_closed() + except (ConnectionError, OSError): + return + if data: + return data.decode("utf-8") async def send_web_command( self, ip: Union[ipaddress.ip_address, str], location: str, - auth: Optional[httpx.DigestAuth] = None, + auth: Optional[aiohttp.BasicAuth] = None, ) -> Optional[dict]: - async with httpx.AsyncClient(verify=False, timeout=TIMEOUT) as client: + async with aiohttp.ClientSession() as session: try: - data = await client.get( + data = await session.get( f"http://{str(ip)}{location}", auth=auth, - timeout=TIMEOUT, + timeout=30, ) - except httpx.HTTPError: + except (aiohttp.ClientError, asyncio.TimeoutError): logger.info(f"{ip}: Web command timeout.") return if data is None: return try: - json_data = data.json() - except json.JSONDecodeError: - return + json_data = await data.json() + except (aiohttp.ContentTypeError, asyncio.TimeoutError): + try: + return json.loads(await data.text()) + except (json.JSONDecodeError, aiohttp.ClientError): + return else: return json_data async def send_api_command( self, ip: Union[ipaddress.ip_address, str], command: str ) -> Optional[dict]: + data = b"" try: - reader, writer = await asyncio.wait_for( - asyncio.open_connection(str(ip), 4028), timeout=TIMEOUT - ) + reader, writer = await asyncio.open_connection(str(ip), 4028) except (ConnectionError, OSError): return cmd = {"command": command} - # send the command - writer.write(json.dumps(cmd).encode("utf-8")) - await writer.drain() + try: + # send the command + writer.write(json.dumps(cmd).encode("utf-8")) + await writer.drain() - # instantiate data - data = b"" + # loop to receive all the data + while True: + d = await reader.read(4096) + if not d: + break + data += d - # loop to receive all the data - while True: - d = await reader.read(4096) - if not d: - break - data += d - - writer.close() - await writer.wait_closed() + writer.close() + await writer.wait_closed() + except asyncio.CancelledError: + writer.close() + await writer.wait_closed() + return + except (ConnectionError, OSError): + return data = await self.fix_api_data(data) @@ -802,7 +907,7 @@ class MinerFactory: pass # last resort, this is slow - auth = httpx.DigestAuth("root", "root") + auth = aiohttp.BasicAuth("root", "root") web_json_data = await self.send_web_command( ip, "/cgi-bin/get_system_info.cgi", auth=auth ) @@ -824,26 +929,111 @@ class MinerFactory: async def get_miner_goldshell(self, ip: str): json_data = await self.send_web_command(ip, "/mcb/status") - if json_data.get("model") is not None: - miner_type = json_data["model"].replace("-", " ").upper() - return self._select_miner_from_classes( - IPv4Address(ip), miner_type, None, None - ) + if json_data: + if json_data.get("model") is not None: + miner_type = json_data["model"].replace("-", " ").upper() + return self._select_miner_from_classes( + IPv4Address(ip), miner_type, None, None + ) + return BFGMiner(ip) - async def get_miner_whatsminer(self, ip: str, json_data: Optional[dict] = None): - if not json_data: - try: - json_data = await self.send_api_command(ip, "devdetails") - except (asyncio.exceptions.TimeoutError, OSError, ConnectionError): - return None + async def get_miner_whatsminer(self, ip: str): + try: + json_data = await self.send_api_command(ip, "devdetails") + except (asyncio.exceptions.TimeoutError, OSError, ConnectionError): + return BTMiner(ip) try: miner_type, submodel = json_data["DEVDETAILS"][0]["Model"].split("V") return self._select_miner_from_classes( IPv4Address(ip), miner_type, submodel, None ) - except LookupError: - return None + except (LookupError, TypeError): + return BTMiner(ip) + + async def get_miner_avalonminer(self, ip: str): + return CGMiner(ip) + + async def get_miner_innosilicon(self, ip: str): + return CGMiner(ip) + + async def get_miner_braiins_os(self, ip: str): + sock_json_data = await self.send_api_command(ip, "devdetails") + api_type = "BOSMiner+" + try: + miner_type = sock_json_data["DEVDETAILS"][0]["Model"] + + return self._select_miner_from_classes( + ip=IPv4Address(ip), + model=miner_type.upper(), + api=api_type, + ver=None, + api_ver=None, + ) + except (TypeError, LookupError): + pass + + try: + async with aiohttp.ClientSession as session: + d = await session.post( + url, json={"query": "{bosminer {info{modelName}}}"} + ) + if d.status == 200: + json_data = await d.json() + miner_type = ( + json_data["data"]["bosminer"]["info"]["modelName"] + ).upper() + return self._select_miner_from_classes( + ip=IPv4Address(ip), + model=miner_type.upper(), + api=api_type, + ver=None, + api_ver=None, + ) + except aiohttp.ClientError: + pass + + return BOSMiner(ip) + + async def get_miner_vnish(self, ip: str): + sock_json_data = await self.send_api_command(ip, "stats") + api_type = "VNish" + try: + miner_type = sock_json_data["STATS"][0]["Type"].upper() + miner_api_ver = None + if " (VNISH" in miner_type: + split_miner_type = miner_type.split(" (VNISH ") + miner_type = split_miner_type[0] + miner_api_ver = split_miner_type[1].replace(")", "") + + return self._select_miner_from_classes( + ip=IPv4Address(ip), + model=miner_type.upper(), + api=api_type, + ver=None, + api_ver=miner_api_ver, + ) + except (TypeError, LookupError): + pass + + return VNish(ip) + + async def get_miner_hiveon(self, ip: str): + sock_json_data = await self.send_api_command(ip, "version") + try: + miner_type = sock_json_data["VERSION"][0]["Type"] + api_type = "Hiveon" + api_ver = sock_json_data["VERSION"][0]["API"] + + return self._select_miner_from_classes( + ip=IPv4Address(ip), + model=miner_type.upper().replace(" HIVEON", ""), + api=api_type, + ver=None, + api_ver=api_ver, + ) + except (TypeError, LookupError): + pass @staticmethod def _select_miner_from_classes( @@ -892,4 +1082,4 @@ class MinerFactory: return miner -FACTORY = MinerFactory() +miner_factory = MinerFactory() diff --git a/pyproject.toml b/pyproject.toml index b2acf39d..c0bfc2de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ httpx = "^0.24.0" passlib = "^1.7.4" pyaml = "^23.5.9" toml = "^0.10.2" +aiohttp = "^3.8.4" [tool.poetry.group.dev] optional = true