refactor: improve settings handling to not use a dataclass, and not use singleton.

This commit is contained in:
UpstreamData
2023-10-02 13:13:31 -06:00
parent fd8cc7378c
commit 7bab4747ad
10 changed files with 144 additions and 178 deletions

View File

@@ -27,10 +27,10 @@ from typing import Literal, Union
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from passlib.handlers.md5_crypt import md5_crypt
from pyasic import settings
from pyasic.API import BaseMinerAPI
from pyasic.errors import APIError
from pyasic.misc import api_min_version
from pyasic.settings import PyasicSettings
### IMPORTANT ###
# you need to change the password of the miners using the Whatsminer
@@ -192,7 +192,7 @@ class BTMinerAPI(BaseMinerAPI):
ip: str,
api_ver: str = "0.0.0",
port: int = 4028,
pwd: str = PyasicSettings().global_whatsminer_password,
pwd: str = settings.get("default_whatsminer_password", "admin"),
):
super().__init__(ip, port)
self.pwd = pwd

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic import settings
from pyasic.API.bmminer import BMMinerAPI
from pyasic.API.bosminer import BOSMinerAPI
from pyasic.API.btminer import BTMinerAPI
@@ -32,7 +33,6 @@ from pyasic.miners.base import AnyMiner
from pyasic.miners.miner_factory import MinerFactory
from pyasic.miners.miner_listener import MinerListener
from pyasic.network import MinerNetwork
from pyasic.settings import PyasicSettings
__all__ = [
"BMMinerAPI",
@@ -53,5 +53,5 @@ __all__ = [
"MinerFactory",
"MinerListener",
"MinerNetwork",
"PyasicSettings",
"settings",
]

View File

@@ -16,18 +16,16 @@
import logging
from pyasic.settings import PyasicSettings
def init_logger():
if PyasicSettings().logfile:
logging.basicConfig(
filename="logfile.txt",
filemode="a",
format="%(pathname)s:%(lineno)d in %(funcName)s\n[%(levelname)s][%(asctime)s](%(name)s) - %(message)s",
datefmt="%x %X",
)
else:
# if PyasicSettings().logfile:
# logging.basicConfig(
# filename="logfile.txt",
# filemode="a",
# format="%(pathname)s:%(lineno)d in %(funcName)s\n[%(levelname)s][%(asctime)s](%(name)s) - %(message)s",
# datefmt="%x %X",
# )
# else:
logging.basicConfig(
format="%(pathname)s:%(lineno)d in %(funcName)s\n[%(levelname)s][%(asctime)s](%(name)s) - %(message)s",
datefmt="%x %X",
@@ -35,10 +33,10 @@ def init_logger():
_logger = logging.getLogger()
if PyasicSettings().debug:
_logger.setLevel(logging.DEBUG)
logging.getLogger("asyncssh").setLevel(logging.DEBUG)
else:
# if PyasicSettings().debug:
# _logger.setLevel(logging.DEBUG)
# logging.getLogger("asyncssh").setLevel(logging.DEBUG)
# else:
_logger.setLevel(logging.WARNING)
logging.getLogger("asyncssh").setLevel(logging.WARNING)

View File

@@ -19,9 +19,9 @@ import ipaddress
import logging
from typing import AsyncIterator, List, Union
from pyasic import settings
from pyasic.miners.miner_factory import AnyMiner, miner_factory
from pyasic.network.net_range import MinerNetworkRange
from pyasic.settings import PyasicSettings
class MinerNetwork:
@@ -108,7 +108,7 @@ class MinerNetwork:
# clear cached miners
miner_factory.clear_cached_miners()
limit = asyncio.Semaphore(PyasicSettings().network_scan_threads)
limit = asyncio.Semaphore(settings.get("network_scan_threads", 300))
miners = await asyncio.gather(
*[self.ping_and_get_miner(host, limit) for host in local_network.hosts()]
)
@@ -136,7 +136,7 @@ class MinerNetwork:
local_network = self.get_network()
# create a list of scan tasks
limit = asyncio.Semaphore(PyasicSettings().network_scan_threads)
limit = asyncio.Semaphore(settings.get("network_scan_threads", 300))
miners = asyncio.as_completed(
[
loop.create_task(self.ping_and_get_miner(host, limit))
@@ -191,12 +191,12 @@ class MinerNetwork:
async def ping_miner(
ip: ipaddress.ip_address, port=4028
) -> Union[None, ipaddress.ip_address]:
for i in range(PyasicSettings().network_ping_retries):
for i in range(settings.get("network_ping_retries", 1)):
try:
connection_fut = asyncio.open_connection(str(ip), port)
# get the read and write streams from the connection
reader, writer = await asyncio.wait_for(
connection_fut, timeout=PyasicSettings().network_ping_timeout
connection_fut, timeout=settings.get("network_ping_timeout", 3)
)
# immediately close connection, we know connection happened
writer.close()
@@ -220,12 +220,12 @@ async def ping_miner(
async def ping_and_get_miner(
ip: ipaddress.ip_address, port=4028
) -> Union[None, AnyMiner]:
for i in range(PyasicSettings().network_ping_retries):
for i in range(settings.get("network_ping_retries", 1)):
try:
connection_fut = asyncio.open_connection(str(ip), port)
# get the read and write streams from the connection
reader, writer = await asyncio.wait_for(
connection_fut, timeout=PyasicSettings().network_ping_timeout
connection_fut, timeout=settings.get("network_ping_timeout", 3)
)
# immediately close connection, we know connection happened
writer.close()

View File

@@ -14,27 +14,26 @@
# limitations under the License. -
# ------------------------------------------------------------------------------
from dataclasses import dataclass
from typing import Any
from pyasic.misc import Singleton
_settings = { # defaults
"network_ping_retries": 1,
"network_ping_timeout": 3,
"network_scan_threads": 300,
"factory_get_retries": 1,
"get_data_retries": 1,
"default_whatsminer_password": "admin",
"default_innosilicon_password": "admin",
"default_antminer_password": "root",
"default_bosminer_password": "root",
"default_vnish_password": "admin",
"default_goldshell_password": "123456789",
}
@dataclass
class PyasicSettings(metaclass=Singleton):
network_ping_retries: int = 1
network_ping_timeout: int = 3
network_scan_threads: int = 300
def get(key: str, other: Any = None) -> Any:
return _settings.get(key, other)
miner_factory_get_version_retries: int = 1
miner_get_data_retries: int = 1
global_whatsminer_password = "admin"
global_innosilicon_password = "admin"
global_antminer_password = "root"
global_bosminer_password = "root"
global_vnish_password = "admin"
global_goldshell_password = "123456789"
debug: bool = False
logfile: bool = False
def update(key: str, val: Any) -> Any:
_settings[key] = val

View File

@@ -19,14 +19,14 @@ from typing import Union
import httpx
from pyasic.settings import PyasicSettings
from pyasic import settings
from pyasic.web import BaseWebAPI
class AntminerModernWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.pwd = PyasicSettings().global_antminer_password
self.pwd = settings.get("default_antminer_password", "root")
async def send_command(
self,
@@ -137,7 +137,7 @@ class AntminerModernWebAPI(BaseWebAPI):
class AntminerOldWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.pwd = PyasicSettings().global_antminer_password
self.pwd = settings.get("default_antminer_password", "root")
async def send_command(
self,

View File

@@ -18,27 +18,14 @@ from typing import Union
import httpx
from pyasic import APIError
from pyasic.settings import PyasicSettings
from pyasic import APIError, settings
from pyasic.web import BaseWebAPI
class BOSMinerWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
self.gql = BOSMinerGQLAPI(ip, PyasicSettings().global_bosminer_password)
self.luci = BOSMinerLuCIAPI(ip, PyasicSettings().global_bosminer_password)
self._pwd = PyasicSettings().global_bosminer_password
super().__init__(ip)
@property
def pwd(self):
return self._pwd
@pwd.setter
def pwd(self, other: str):
self._pwd = other
self.luci.pwd = other
self.gql.pwd = other
self.pwd = settings.get("default_bosminer_password", "root")
async def send_command(
self,
@@ -47,70 +34,25 @@ class BOSMinerWebAPI(BaseWebAPI):
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
if command.startswith("/cgi-bin/luci"):
return await self.luci.send_command(command)
if isinstance(command, str):
return await self.send_luci_command(command)
else:
return await self.gql.send_command(command)
return await self.send_gql_command(command)
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
luci_commands = []
gql_commands = []
for cmd in commands:
if cmd.startswith("/cgi-bin/luci"):
luci_commands.append(cmd)
if isinstance(cmd, dict):
gql_commands.append(cmd)
luci_data = await self.luci.multicommand(*luci_commands)
gql_data = await self.gql.multicommand(*gql_commands)
if gql_data is None:
gql_data = {}
if luci_data is None:
luci_data = {}
data = dict(**luci_data, **gql_data)
return data
class BOSMinerGQLAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
async def multicommand(self, *commands: dict) -> dict:
def merge(*d: dict):
ret = {}
for i in d:
if i:
for k in i:
if not k in ret:
ret[k] = i[k]
def parse_command(self, graphql_command: Union[dict, set]) -> str:
if isinstance(graphql_command, dict):
data = []
for key in graphql_command:
if graphql_command[key] is not None:
parsed = self.parse_command(graphql_command[key])
data.append(key + parsed)
else:
ret[k] = merge(ret[k], i[k])
return None if ret == {} else ret
data.append(key)
else:
data = graphql_command
return "{" + ",".join(data) + "}"
command = merge(*commands)
data = await self.send_command(command)
if data is not None:
if data.get("data") is None:
try:
commands = list(commands)
# noinspection PyTypeChecker
commands.remove({"bos": {"faultLight": None}})
command = merge(*commands)
data = await self.send_command(command)
except (LookupError, ValueError):
pass
if not data:
data = {}
data["multicommand"] = False
return data
async def send_command(
async def send_gql_command(
self,
command: dict,
) -> dict:
@@ -131,18 +73,62 @@ class BOSMinerGQLAPI:
except json.decoder.JSONDecodeError:
pass
def parse_command(self, graphql_command: Union[dict, set]) -> str:
if isinstance(graphql_command, dict):
data = []
for key in graphql_command:
if graphql_command[key] is not None:
parsed = self.parse_command(graphql_command[key])
data.append(key + parsed)
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
luci_commands = []
gql_commands = []
for cmd in commands:
if isinstance(cmd, dict):
gql_commands.append(cmd)
if isinstance(cmd, str):
luci_commands.append(cmd)
luci_data = await self.luci_multicommand(*luci_commands)
gql_data = await self.gql_multicommand(*gql_commands)
if gql_data is None:
gql_data = {}
if luci_data is None:
luci_data = {}
data = dict(**luci_data, **gql_data)
return data
async def luci_multicommand(self, *commands: str) -> dict:
data = {}
for command in commands:
data[command] = await self.send_luci_command(command, ignore_errors=True)
return data
async def gql_multicommand(self, *commands: dict) -> dict:
def merge(*d: dict):
ret = {}
for i in d:
if i:
for k in i:
if not k in ret:
ret[k] = i[k]
else:
data.append(key)
else:
data = graphql_command
return "{" + ",".join(data) + "}"
ret[k] = merge(ret[k], i[k])
return None if ret == {} else ret
command = merge(*commands)
data = await self.send_command(command)
if data is not None:
if data.get("data") is None:
try:
commands = list(commands)
# noinspection PyTypeChecker
commands.remove({"bos": {"faultLight": None}})
command = merge(*commands)
data = await self.send_gql_command(command)
except (LookupError, ValueError):
pass
if not data:
data = {}
data["multicommand"] = False
return data
async def auth(self, client: httpx.AsyncClient) -> None:
url = f"http://{self.ip}/graphql"
@@ -157,23 +143,10 @@ class BOSMinerGQLAPI:
},
)
class BOSMinerLuCIAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
async def multicommand(self, *commands: str) -> dict:
data = {}
for command in commands:
data[command] = await self.send_command(command, ignore_errors=True)
return data
async def send_command(self, path: str, ignore_errors: bool = False) -> dict:
async def send_luci_command(self, path: str, ignore_errors: bool = False) -> dict:
try:
async with httpx.AsyncClient() as client:
await self.auth(client)
await self.luci_auth(client)
data = await client.get(
f"http://{self.ip}{path}", headers={"User-Agent": "BTC Tools v0.1"}
)
@@ -189,7 +162,7 @@ class BOSMinerLuCIAPI:
return {}
raise APIError(f"Web command failed: path={path}")
async def auth(self, session: httpx.AsyncClient):
async def luci_auth(self, session: httpx.AsyncClient):
login = {"luci_username": self.username, "luci_password": self.pwd}
url = f"http://{self.ip}/cgi-bin/luci"
headers = {
@@ -199,27 +172,23 @@ class BOSMinerLuCIAPI:
await session.post(url, headers=headers, data=login)
async def get_net_conf(self):
return await self.send_command("/cgi-bin/luci/admin/network/iface_status/lan")
return await self.send_luci_command(
"/cgi-bin/luci/admin/network/iface_status/lan"
)
async def get_cfg_metadata(self):
return await self.send_command("/cgi-bin/luci/admin/miner/cfg_metadata")
return await self.send_luci_command("/cgi-bin/luci/admin/miner/cfg_metadata")
async def get_cfg_data(self):
return await self.send_command("/cgi-bin/luci/admin/miner/cfg_data")
return await self.send_luci_command("/cgi-bin/luci/admin/miner/cfg_data")
async def get_bos_info(self):
return await self.send_command("/cgi-bin/luci/bos/info")
return await self.send_luci_command("/cgi-bin/luci/bos/info")
async def get_overview(self):
return await self.send_command(
return await self.send_luci_command(
"/cgi-bin/luci/admin/status/overview?status=1"
) # needs status=1 or it fails
async def get_api_status(self):
return await self.send_command("/cgi-bin/luci/admin/miner/api_status")
class BOSMinerGRPCAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.pwd = pwd
return await self.send_luci_command("/cgi-bin/luci/admin/miner/api_status")

View File

@@ -19,7 +19,7 @@ from typing import Union
import httpx
from pyasic.settings import PyasicSettings
from pyasic import settings
from pyasic.web import BaseWebAPI
@@ -27,7 +27,7 @@ class GoldshellWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.username = "admin"
self.pwd = PyasicSettings().global_goldshell_password
self.pwd = settings.get("default_goldshell_password", "123456789")
self.jwt = None
async def auth(self):
@@ -72,7 +72,7 @@ class GoldshellWebAPI(BaseWebAPI):
if not self.jwt:
await self.auth()
async with httpx.AsyncClient() as client:
for i in range(PyasicSettings().miner_get_data_retries):
for i in range(settings.get("get_data_retries", 1)):
try:
if parameters:
response = await client.put(

View File

@@ -19,8 +19,8 @@ from typing import Union
import httpx
from pyasic import settings
from pyasic.errors import APIError
from pyasic.settings import PyasicSettings
from pyasic.web import BaseWebAPI
@@ -28,7 +28,7 @@ class InnosiliconWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.username = "admin"
self.pwd = PyasicSettings().global_innosilicon_password
self.pwd = settings.get("default_innosilicon_password", "admin")
self.jwt = None
async def auth(self):
@@ -55,7 +55,7 @@ class InnosiliconWebAPI(BaseWebAPI):
if not self.jwt:
await self.auth()
async with httpx.AsyncClient() as client:
for i in range(PyasicSettings().miner_get_data_retries):
for i in range(settings.get("get_data_retries", 1)):
try:
response = await client.post(
f"http://{self.ip}/api/{command}",

View File

@@ -19,7 +19,7 @@ from typing import Union
import httpx
from pyasic.settings import PyasicSettings
from pyasic import settings
from pyasic.web import BaseWebAPI
@@ -27,7 +27,7 @@ class VNishWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.username = "admin"
self.pwd = PyasicSettings().global_vnish_password
self.pwd = settings.get("default_vnish_password", "admin")
self.token = None
async def auth(self):
@@ -59,7 +59,7 @@ class VNishWebAPI(BaseWebAPI):
if not self.token:
await self.auth()
async with httpx.AsyncClient() as client:
for i in range(PyasicSettings().miner_get_data_retries):
for i in range(settings.get("get_data_retries", 1)):
try:
auth = self.token
if command.startswith("system"):