switch braiins miners over to using new config dataclass

This commit is contained in:
UpstreamData
2022-06-07 10:49:41 -06:00
parent 733437ef03
commit 943ebc77a1
3 changed files with 152 additions and 14 deletions

View File

@@ -3,6 +3,11 @@ from typing import List, Literal
import random
import string
import toml
import yaml
import json
import time
@dataclass
class _Pool:
@@ -20,19 +25,39 @@ class _Pool:
self.password = data[key]
return self
def as_x19(self, user_suffix: str = None):
username = self.username
if user_suffix:
username = f"{username}{user_suffix}"
pool = {"url": self.url, "user": username, "pass": self.password}
return pool
def as_bos(self, user_suffix: str = None):
username = self.username
if user_suffix:
username = f"{username}{user_suffix}"
pool = {"url": self.url, "user": username, "password": self.password}
return pool
@dataclass
class _PoolGroup:
pools: List[_Pool] = None
quota: int = 1
group_name: str = "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(6)
) # generate random pool group name in case it isn't set
group_name: str = None
pools: List[_Pool] = None
def __post_init__(self):
if not self.group_name:
self.group_name = "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(6)
) # generate random pool group name in case it isn't set
def from_dict(self, data: dict):
pools = []
for key in data.keys():
if key == "name":
if key in ["name", "group_name"]:
self.group_name = data[key]
if key == "quota":
self.quota = data[key]
@@ -42,6 +67,20 @@ class _PoolGroup:
self.pools = pools
return self
def as_x19(self, user_suffix: str = None):
pools = []
for pool in self.pools[:3]:
pools.append(pool.as_x19(user_suffix=user_suffix))
return pools
def as_bos(self, user_suffix: str = None):
group = {
"name": self.group_name,
"quota": self.quota,
"pool": [pool.as_bos(user_suffix=user_suffix) for pool in self.pools],
}
return group
@dataclass
class MinerConfig:
@@ -69,11 +108,17 @@ class MinerConfig:
def as_dict(self):
data_dict = asdict(self)
for key in asdict(self).keys():
if data_dict[key] == None:
if data_dict[key] is None:
del data_dict[key]
return data_dict
def from_dict(self, data: dict):
def as_toml(self):
return toml.dumps(self.as_dict())
def as_yaml(self):
return yaml.dump(self.as_dict(), sort_keys=False)
def from_raw(self, data: dict):
pool_groups = []
for key in data.keys():
if key == "pools":
@@ -131,10 +176,96 @@ class MinerConfig:
self.pool_groups = pool_groups
return self
def from_dict(self, data: dict):
pool_groups = []
for group in data["pool_groups"]:
pool_groups.append(_PoolGroup().from_dict(group))
for key in data.keys():
if getattr(self, key) and not key == "pool_groups":
setattr(self, key, data[key])
self.pool_groups = pool_groups
return self
def from_toml(self, data: str):
return self.from_dict(toml.loads(data))
def from_yaml(self, data: str):
return self.from_dict(yaml.load(data, Loader=yaml.SafeLoader))
def as_x19(self, user_suffix: str = None):
cfg = {
"pools": self.pool_groups[0].as_x19(user_suffix=user_suffix),
"bitmain-fan-ctrl": False,
"bitmain-fan-pwn": 100,
}
if not self.temp_mode == "auto":
cfg["bitmain-fan-ctrl"] = True
if self.fan_speed:
cfg["bitmain-fan-ctrl"] = self.fan_speed
return json.dumps(cfg)
def as_bos(self, model: str = "S9", user_suffix: str = None):
cfg = {
"format": {
"version": "1.2+",
"model": f"Antminer {model}",
"generator": "Upstream Config Utility",
"timestamp": int(time.time()),
},
"group": [
group.as_bos(user_suffix=user_suffix) for group in self.pool_groups
],
"temp_control": {
"mode": self.temp_mode,
"target_temp": self.temp_target,
"hot_temp": self.temp_hot,
"dangerous_temp": self.temp_dangerous,
},
}
if self.autotuning_enabled or self.autotuning_wattage:
cfg["autotuning"] = {}
if self.autotuning_enabled:
cfg["autotuning"]["enabled"] = self.autotuning_enabled
if self.autotuning_wattage:
cfg["autotuning"]["psu_power_limit"] = self.autotuning_wattage
if self.asicboost:
cfg["hash_chain_global"] = {}
cfg["hash_chain_global"]["asic_boost"] = self.asicboost
if any(
[
getattr(self, item)
for item in [
"dps_enabled",
"dps_power_step",
"dps_min_power",
"dps_shutdown_enabled",
"dps_shutdown_duration",
]
]
):
cfg["power_scaling"] = {}
if self.dps_enabled:
cfg["power_scaling"]["enabled"] = self.dps_enabled
if self.dps_power_step:
cfg["power_scaling"]["power_step"] = self.dps_power_step
if self.dps_min_power:
cfg["power_scaling"]["min_psu_power_limit"] = self.dps_min_power
if self.dps_shutdown_enabled:
cfg["power_scaling"]["shutdown_enabled"] = self.dps_shutdown_enabled
if self.dps_shutdown_duration:
cfg["power_scaling"]["shutdown_duration"] = self.dps_shutdown_duration
return toml.dumps(cfg)
if __name__ == "__main__":
import pprint
import json
bos_conf = {
"group": [

View File

@@ -11,7 +11,7 @@ from API import APIError
from data import MinerData
from config.bos import bos_config_convert, general_config_convert_bos
from config.miner_config import MinerConfig
from settings import MINER_FACTORY_GET_VERSION_RETRIES as DATA_RETRIES
@@ -102,8 +102,9 @@ class BOSMiner(BaseMiner):
async with sftp.open("/etc/bosminer.toml") as file:
toml_data = toml.loads(await file.read())
logging.debug(f"{self}: Converting config file.")
cfg = bos_config_convert(toml_data)
cfg = MinerConfig().from_raw(toml_data)
self.config = cfg
return self.config
async def get_hostname(self) -> str:
"""Get miner hostname.
@@ -192,11 +193,17 @@ class BOSMiner(BaseMiner):
logging.debug(f"{self}: Sending config.")
if ip_user:
suffix = str(self.ip).split(".")[-1]
toml_conf = toml.dumps(
general_config_convert_bos(yaml_config, user_suffix=suffix)
toml_conf = (
MinerConfig()
.from_yaml(yaml_config)
.as_bos(model=self.model.replace(" (BOS)", ""), user_suffix=suffix)
)
else:
toml_conf = toml.dumps(general_config_convert_bos(yaml_config))
toml_conf = (
MinerConfig()
.from_yaml(yaml_config)
.as_bos(model=self.model.replace(" (BOS)", ""))
)
async with (await self._get_ssh_connection()) as conn:
logging.debug(f"{self}: Opening SFTP connection.")
async with conn.start_sftp_client() as sftp:

View File

@@ -20,7 +20,7 @@ async def btn_import(table, selected):
miner = await MinerFactory().get_miner(ip)
await miner.get_config()
config = miner.config
window["cfg_config_txt"].update(config)
window["cfg_config_txt"].update(config.as_yaml())
@disable_buttons("Configuring")