feature: LuxOS fixes and updates (#192)

* feature: add luxos tuner support to config.

* feature: add luxos temp control support to config.

* bug: fix failure to identify luxOS miners sometimes.

* feature: add get_config with temperature parsing.

* bug: fix some handling in hashboards.

* feature: add API version and fw version.

* refactor: improve RPC api handling.

* refactor: remove useless code, and tag bug.

* feature: add fault light check support.

* refactor: improve fanset compatibility.

* feature: add fan config parsing.

* feature: add pools parsing from luxos.

---------

Co-authored-by: Upstream Data <brett@upstreamdata.ca>
This commit is contained in:
Brett Rowan
2024-11-05 09:43:34 -07:00
committed by GitHub
parent 5749e173d1
commit 8e6240cdba
9 changed files with 332 additions and 194 deletions

View File

@@ -148,6 +148,14 @@ class MinerConfig:
**self.pools.as_bitaxe(user_suffix=user_suffix), **self.pools.as_bitaxe(user_suffix=user_suffix),
} }
def as_luxos(self, user_suffix: str = None) -> dict:
return {
**self.fan_mode.as_luxos(),
**self.temperature.as_luxos(),
**self.mining_mode.as_luxos(),
**self.pools.as_luxos(user_suffix=user_suffix),
}
@classmethod @classmethod
def from_dict(cls, dict_conf: dict) -> "MinerConfig": def from_dict(cls, dict_conf: dict) -> "MinerConfig":
"""Constructs a MinerConfig object from a dictionary.""" """Constructs a MinerConfig object from a dictionary."""
@@ -256,3 +264,15 @@ class MinerConfig:
return cls( return cls(
pools=PoolConfig.from_iceriver(web_userpanel), pools=PoolConfig.from_iceriver(web_userpanel),
) )
@classmethod
def from_luxos(
cls, rpc_tempctrl: dict, rpc_fans: dict, rpc_pools: dict, rpc_groups: dict
) -> "MinerConfig":
return cls(
temperature=TemperatureConfig.from_luxos(rpc_tempctrl=rpc_tempctrl),
fan_mode=FanModeConfig.from_luxos(
rpc_tempctrl=rpc_tempctrl, rpc_fans=rpc_fans
),
pools=PoolConfig.from_luxos(rpc_pools=rpc_pools, rpc_groups=rpc_groups),
)

View File

@@ -63,6 +63,9 @@ class MinerConfigOption(Enum):
def as_bitaxe(self) -> dict: def as_bitaxe(self) -> dict:
return self.value.as_bitaxe() return self.value.as_bitaxe()
def as_luxos(self) -> dict:
return self.value.as_luxos()
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
return self.value(*args, **kwargs) return self.value(*args, **kwargs)
@@ -125,6 +128,9 @@ class MinerConfigValue:
def as_bitaxe(self) -> dict: def as_bitaxe(self) -> dict:
return {} return {}
def as_luxos(self) -> dict:
return {}
def __getitem__(self, item): def __getitem__(self, item):
try: try:
return getattr(self, item) return getattr(self, item)

View File

@@ -83,6 +83,9 @@ class FanModeNormal(MinerConfigValue):
def as_bitaxe(self) -> dict: def as_bitaxe(self) -> dict:
return {"autoFanspeed": 1} return {"autoFanspeed": 1}
def as_luxos(self) -> dict:
return {"fanset": {"speed": -1, "min_fans": self.minimum_fans}}
@dataclass @dataclass
class FanModeManual(MinerConfigValue): class FanModeManual(MinerConfigValue):
@@ -144,6 +147,9 @@ class FanModeManual(MinerConfigValue):
def as_bitaxe(self) -> dict: def as_bitaxe(self) -> dict:
return {"autoFanspeed": 0, "fanspeed": self.speed} return {"autoFanspeed": 0, "fanspeed": self.speed}
def as_luxos(self) -> dict:
return {"fanset": {"speed": self.speed, "min_fans": self.minimum_fans}}
@dataclass @dataclass
class FanModeImmersion(MinerConfigValue): class FanModeImmersion(MinerConfigValue):
@@ -167,6 +173,9 @@ class FanModeImmersion(MinerConfigValue):
def as_mara(self) -> dict: def as_mara(self) -> dict:
return {"general-config": {"environment-profile": "OilImmersionCooling"}} return {"general-config": {"environment-profile": "OilImmersionCooling"}}
def as_luxos(self) -> dict:
return {"fanset": {"speed": 0, "min_fans": 0}}
class FanModeConfig(MinerConfigOption): class FanModeConfig(MinerConfigOption):
normal = FanModeNormal normal = FanModeNormal
@@ -304,3 +313,23 @@ class FanModeConfig(MinerConfigOption):
return cls.normal() return cls.normal()
else: else:
return cls.manual(speed=web_system_info["fanspeed"]) return cls.manual(speed=web_system_info["fanspeed"])
@classmethod
def from_luxos(cls, rpc_fans: dict, rpc_tempctrl: dict):
try:
mode = rpc_tempctrl["TEMPCTRL"][0]["Mode"]
if mode == "Manual":
speed = rpc_fans["FANS"][0]["Speed"]
min_fans = rpc_fans["FANCTRL"][0]["MinFans"]
if min_fans == 0 and speed == 0:
return cls.immersion()
return cls.manual(
speed=speed,
minimum_fans=min_fans,
)
return cls.normal(
minimum_fans=rpc_fans["FANCTRL"][0]["MinFans"],
)
except LookupError:
pass
return cls.default()

View File

@@ -70,6 +70,9 @@ class MiningModeNormal(MinerConfigValue):
} }
} }
def as_luxos(self) -> dict:
return {"autotunerset": {"enabled": False}}
@dataclass @dataclass
class MiningModeSleep(MinerConfigValue): class MiningModeSleep(MinerConfigValue):
@@ -240,6 +243,9 @@ class MiningModePowerTune(MinerConfigValue):
} }
} }
def as_luxos(self) -> dict:
return {"autotunerset": {"enabled": True}}
@dataclass @dataclass
class MiningModeHashrateTune(MinerConfigValue): class MiningModeHashrateTune(MinerConfigValue):
@@ -333,6 +339,9 @@ class MiningModeHashrateTune(MinerConfigValue):
} }
} }
def as_luxos(self) -> dict:
return {"autotunerset": {"enabled": True}}
@dataclass @dataclass
class ManualBoardSettings(MinerConfigValue): class ManualBoardSettings(MinerConfigValue):

View File

@@ -222,6 +222,10 @@ class Pool(MinerConfigValue):
password=web_system_info.get("stratumPassword", ""), password=web_system_info.get("stratumPassword", ""),
) )
@classmethod
def from_luxos(cls, rpc_pools: dict) -> "Pool":
return cls.from_api(rpc_pools)
@classmethod @classmethod
def from_iceriver(cls, web_pool: dict) -> "Pool": def from_iceriver(cls, web_pool: dict) -> "Pool":
return cls( return cls(
@@ -523,6 +527,9 @@ class PoolConfig(MinerConfigValue):
def as_bitaxe(self, user_suffix: str = None) -> dict: def as_bitaxe(self, user_suffix: str = None) -> dict:
return self.groups[0].as_bitaxe(user_suffix=user_suffix) return self.groups[0].as_bitaxe(user_suffix=user_suffix)
def as_luxos(self, user_suffix: str = None) -> dict:
return {}
@classmethod @classmethod
def from_api(cls, api_pools: dict) -> "PoolConfig": def from_api(cls, api_pools: dict) -> "PoolConfig":
try: try:
@@ -589,3 +596,20 @@ class PoolConfig(MinerConfigValue):
@classmethod @classmethod
def from_iceriver(cls, web_userpanel: dict) -> "PoolConfig": def from_iceriver(cls, web_userpanel: dict) -> "PoolConfig":
return cls(groups=[PoolGroup.from_iceriver(web_userpanel)]) return cls(groups=[PoolGroup.from_iceriver(web_userpanel)])
@classmethod
def from_luxos(cls, rpc_groups: dict, rpc_pools: dict) -> "PoolConfig":
return cls(
groups=[
PoolGroup(
pools=[
Pool.from_luxos(pool)
for pool in rpc_pools["POOLS"]
if pool["GROUP"] == group["GROUP"]
],
name=group["Name"],
quota=group["Quota"],
)
for group in rpc_groups["GROUPS"]
]
)

View File

@@ -54,6 +54,9 @@ class TemperatureConfig(MinerConfigValue):
temps_config["temps"]["shutdown"] = self.hot temps_config["temps"]["shutdown"] = self.hot
return temps_config return temps_config
def as_luxos(self) -> dict:
return {"tempctrlset": [self.target or "", self.hot or "", self.danger or ""]}
@classmethod @classmethod
def from_dict(cls, dict_conf: dict | None) -> "TemperatureConfig": def from_dict(cls, dict_conf: dict | None) -> "TemperatureConfig":
return cls( return cls(
@@ -130,3 +133,16 @@ class TemperatureConfig(MinerConfigValue):
return cls(**conf) return cls(**conf)
return cls.default() return cls.default()
@classmethod
def from_luxos(cls, rpc_tempctrl: dict) -> "TemperatureConfig":
try:
tempctrl_config = rpc_tempctrl["TEMPCTRL"][0]
return cls(
target=tempctrl_config.get("Target"),
hot=tempctrl_config.get("Hot"),
danger=tempctrl_config.get("Dangerous"),
)
except LookupError:
pass
return cls.default()

View File

@@ -56,6 +56,15 @@ LUXMINER_DATA_LOC = DataLocations(
str(DataOptions.POOLS): DataFunction( str(DataOptions.POOLS): DataFunction(
"_get_pools", [RPCAPICommand("rpc_pools", "pools")] "_get_pools", [RPCAPICommand("rpc_pools", "pools")]
), ),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver", [RPCAPICommand("rpc_version", "version")]
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver", [RPCAPICommand("rpc_version", "version")]
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light", [RPCAPICommand("rpc_config", "config")]
),
} }
) )
@@ -68,25 +77,9 @@ class LUXMiner(LuxOSFirmware):
data_locations = LUXMINER_DATA_LOC data_locations = LUXMINER_DATA_LOC
async def _get_session(self) -> Optional[str]:
try:
data = await self.rpc.session()
if not data["SESSION"][0]["SessionID"] == "":
return data["SESSION"][0]["SessionID"]
except APIError:
pass
try:
data = await self.rpc.logon()
return data["SESSION"][0]["SessionID"]
except (LookupError, APIError):
return
async def fault_light_on(self) -> bool: async def fault_light_on(self) -> bool:
try: try:
session_id = await self._get_session() await self.rpc.ledset("red", "blink")
if session_id:
await self.rpc.ledset(session_id, "red", "blink")
return True return True
except (APIError, LookupError): except (APIError, LookupError):
pass pass
@@ -94,9 +87,7 @@ class LUXMiner(LuxOSFirmware):
async def fault_light_off(self) -> bool: async def fault_light_off(self) -> bool:
try: try:
session_id = await self._get_session() await self.rpc.ledset("red", "off")
if session_id:
await self.rpc.ledset(session_id, "red", "off")
return True return True
except (APIError, LookupError): except (APIError, LookupError):
pass pass
@@ -107,9 +98,7 @@ class LUXMiner(LuxOSFirmware):
async def restart_luxminer(self) -> bool: async def restart_luxminer(self) -> bool:
try: try:
session_id = await self._get_session() await self.rpc.resetminer()
if session_id:
await self.rpc.resetminer(session_id)
return True return True
except (APIError, LookupError): except (APIError, LookupError):
pass pass
@@ -117,9 +106,7 @@ class LUXMiner(LuxOSFirmware):
async def stop_mining(self) -> bool: async def stop_mining(self) -> bool:
try: try:
session_id = await self._get_session() await self.rpc.sleep()
if session_id:
await self.rpc.curtail(session_id)
return True return True
except (APIError, LookupError): except (APIError, LookupError):
pass pass
@@ -127,25 +114,27 @@ class LUXMiner(LuxOSFirmware):
async def resume_mining(self) -> bool: async def resume_mining(self) -> bool:
try: try:
session_id = await self._get_session() await self.rpc.wakeup()
if session_id:
await self.rpc.wakeup(session_id)
return True return True
except (APIError, LookupError): except (APIError, LookupError):
pass pass
async def reboot(self) -> bool: async def reboot(self) -> bool:
try: try:
session_id = await self._get_session() await self.rpc.rebootdevice()
if session_id:
await self.rpc.rebootdevice(session_id)
return True return True
except (APIError, LookupError): except (APIError, LookupError):
pass pass
return False return False
async def get_config(self) -> MinerConfig: async def get_config(self) -> MinerConfig:
return self.config data = await self.rpc.multicommand("tempctrl", "fans", "pools", "groups")
return MinerConfig.from_luxos(
rpc_tempctrl=data.get("tempctrl", [{}])[0],
rpc_fans=data.get("fans", [{}])[0],
rpc_pools=data.get("pools", [{}])[0],
rpc_groups=data.get("groups", [{}])[0],
)
async def upgrade_firmware(self) -> bool: async def upgrade_firmware(self) -> bool:
""" """
@@ -168,20 +157,17 @@ class LUXMiner(LuxOSFirmware):
################################################## ##################################################
async def _get_mac(self, rpc_config: dict = None) -> Optional[str]: async def _get_mac(self, rpc_config: dict = None) -> Optional[str]:
mac = None
if rpc_config is None: if rpc_config is None:
try: try:
rpc_config = await self.rpc.config() rpc_config = await self.rpc.config()
except APIError: except APIError:
return None pass
if rpc_config is not None: if rpc_config is not None:
try: try:
mac = rpc_config["CONFIG"][0]["MACAddr"] return rpc_config["CONFIG"][0]["MACAddr"].upper()
except KeyError: except KeyError:
return None pass
return mac
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[AlgoHashRate]: async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[AlgoHashRate]:
if rpc_summary is None: if rpc_summary is None:
@@ -199,59 +185,47 @@ class LUXMiner(LuxOSFirmware):
pass pass
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]: async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
hashboards = [] hashboards = [
HashBoard(idx, expected_chips=self.expected_chips)
for idx in range(self.expected_hashboards)
]
if rpc_stats is None: if rpc_stats is None:
try: try:
rpc_stats = await self.rpc.stats() rpc_stats = await self.rpc.stats()
except APIError: except APIError:
pass pass
if rpc_stats is not None: if rpc_stats is not None:
try: try:
board_offset = -1 # TODO: bugged on S9 because of index issues, fix later.
boards = rpc_stats["STATS"] board_stats = rpc_stats["STATS"][1]
if len(boards) > 1: for idx in range(3):
for board_num in range(1, 16, 5): board_n = idx + 1
for _b_num in range(5): hashboards[idx].hashrate = AlgoHashRate.SHA256(
b = boards[1].get(f"chain_acn{board_num + _b_num}") float(board_stats[f"chain_rate{board_n}"]), HashUnit.SHA256.GH
if b and not b == 0 and board_offset == -1:
board_offset = board_num
if board_offset == -1:
board_offset = 1
for i in range(
board_offset, board_offset + self.expected_hashboards
):
hashboard = HashBoard(
slot=i - board_offset, expected_chips=self.expected_chips
)
chip_temp = boards[1].get(f"temp{i}")
if chip_temp:
hashboard.chip_temp = round(chip_temp)
temp = boards[1].get(f"temp2_{i}")
if temp:
hashboard.temp = round(temp)
hashrate = boards[1].get(f"chain_rate{i}")
if hashrate:
hashboard.hashrate = AlgoHashRate.SHA256(
hashrate, HashUnit.SHA256.GH
).into(self.algo.unit.default) ).into(self.algo.unit.default)
hashboards[idx].chips = int(board_stats[f"chain_acn{board_n}"])
chips = boards[1].get(f"chain_acn{i}") chip_temp_data = list(
if chips: filter(
hashboard.chips = chips lambda x: not x == 0,
hashboard.missing = False map(int, board_stats[f"temp_chip{board_n}"].split("-")),
if (not chips) or (not chips > 0): )
hashboard.missing = True )
hashboards.append(hashboard) hashboards[idx].chip_temp = (
except (LookupError, ValueError, TypeError): sum([chip_temp_data[0], chip_temp_data[3]]) / 2
)
board_temp_data = list(
filter(
lambda x: not x == 0,
map(int, board_stats[f"temp_pcb{board_n}"].split("-")),
)
)
hashboards[idx].temp = (
sum([board_temp_data[1], board_temp_data[2]]) / 2
)
hashboards[idx].missing = False
except LookupError:
pass pass
return hashboards return hashboards
async def _get_wattage(self, rpc_power: dict = None) -> Optional[int]: async def _get_wattage(self, rpc_power: dict = None) -> Optional[int]:
@@ -319,6 +293,45 @@ class LUXMiner(LuxOSFirmware):
except LookupError: except LookupError:
pass pass
async def _get_fw_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
rpc_version = await self.rpc.version()
except APIError:
pass
if rpc_version is not None:
try:
return rpc_version["VERSION"][0]["Miner"]
except LookupError:
pass
async def _get_api_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
rpc_version = await self.rpc.version()
except APIError:
pass
if rpc_version is not None:
try:
return rpc_version["VERSION"][0]["API"]
except LookupError:
pass
async def _get_fault_light(self, rpc_config: dict = None) -> Optional[bool]:
if rpc_config is None:
try:
rpc_config = await self.rpc.config()
except APIError:
pass
if rpc_config is not None:
try:
return not rpc_config["CONFIG"][0]["RedLed"] == "off"
except LookupError:
pass
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]: async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
if rpc_pools is None: if rpc_pools is None:
try: try:

View File

@@ -717,10 +717,10 @@ class MinerFactory:
return MinerTypes.BRAIINS_OS return MinerTypes.BRAIINS_OS
if "BTMINER" in upper_data or "BITMICRO" in upper_data: if "BTMINER" in upper_data or "BITMICRO" in upper_data:
return MinerTypes.WHATSMINER return MinerTypes.WHATSMINER
if "HIVEON" in upper_data:
return MinerTypes.HIVEON
if "LUXMINER" in upper_data: if "LUXMINER" in upper_data:
return MinerTypes.LUX_OS return MinerTypes.LUX_OS
if "HIVEON" in upper_data:
return MinerTypes.HIVEON
if "KAONSU" in upper_data: if "KAONSU" in upper_data:
return MinerTypes.MARATHON return MinerTypes.MARATHON
if "ANTMINER" in upper_data and "DEVDETAILS" not in upper_data: if "ANTMINER" in upper_data and "DEVDETAILS" not in upper_data:

View File

@@ -13,8 +13,9 @@
# See the License for the specific language governing permissions and - # See the License for the specific language governing permissions and -
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from typing import Literal from typing import Literal, Optional, Union
from pyasic import APIError
from pyasic.rpc.base import BaseMinerRPCAPI from pyasic.rpc.base import BaseMinerRPCAPI
@@ -32,6 +33,48 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
rely on it to send the command for them. rely on it to send the command for them.
""" """
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.session_token = None
async def send_privileged_command(
self, command: Union[str, bytes], *args, **kwargs
) -> dict:
if self.session_token is None:
await self.auth()
return await self.send_command(
command,
self.session_token,
*args,
**kwargs,
)
async def send_command(
self,
command: Union[str, bytes],
*args,
**kwargs,
) -> dict:
if kwargs.get("parameters") is not None and len(args) == 0:
return await super().send_command(command, **kwargs)
return await super().send_command(command, parameters=",".join(args), **kwargs)
async def auth(self) -> Optional[str]:
try:
data = await self.session()
if not data["SESSION"][0]["SessionID"] == "":
self.session_token = data["SESSION"][0]["SessionID"]
return self.session_token
except APIError:
pass
try:
data = await self.logon()
self.session_token = data["SESSION"][0]["SessionID"]
return self.session_token
except (LookupError, APIError):
pass
async def addgroup(self, name: str, quota: int) -> dict: async def addgroup(self, name: str, quota: int) -> dict:
"""Add a pool group. """Add a pool group.
<details> <details>
@@ -45,7 +88,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
Confirmation of adding a pool group. Confirmation of adding a pool group.
</details> </details>
""" """
return await self.send_command("addgroup", parameters=f"{name},{quota}") return await self.send_command("addgroup", name, quota)
async def addpool( async def addpool(
self, url: str, user: str, pwd: str = "", group_id: str = None self, url: str, user: str, pwd: str = "", group_id: str = None
@@ -67,7 +110,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
pool_data = [url, user, pwd] pool_data = [url, user, pwd]
if group_id is not None: if group_id is not None:
pool_data.append(group_id) pool_data.append(group_id)
return await self.send_command("addpool", parameters=",".join(pool_data)) return await self.send_command("addpool", *pool_data)
async def asc(self, n: int) -> dict: async def asc(self, n: int) -> dict:
"""Get data for ASC device n. """Get data for ASC device n.
@@ -81,7 +124,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
The data for ASC device n. The data for ASC device n.
</details> </details>
""" """
return await self.send_command("asc", parameters=n) return await self.send_command("asc", n)
async def asccount(self) -> dict: async def asccount(self) -> dict:
"""Get data on the number of ASC devices and their info. """Get data on the number of ASC devices and their info.
@@ -108,7 +151,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
* Access (Y/N) <- you have access to use the command * Access (Y/N) <- you have access to use the command
</details> </details>
""" """
return await self.send_command("check", parameters=command) return await self.send_command("check", command)
async def coin(self) -> dict: async def coin(self) -> dict:
"""Get information on the current coin. """Get information on the current coin.
@@ -137,19 +180,38 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
""" """
return await self.send_command("config") return await self.send_command("config")
async def curtail(self, session_id: str) -> dict: async def curtail(self) -> dict:
"""Put the miner into sleep mode. Requires a session_id from logon. """Put the miner into sleep mode.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns: Returns:
A confirmation of putting the miner to sleep. A confirmation of putting the miner to sleep.
</details> </details>
""" """
return await self.send_command("curtail", parameters=session_id) return await self.send_privileged_command("curtail", "sleep")
async def sleep(self) -> dict:
"""Put the miner into sleep mode.
<details>
<summary>Expand</summary>
Returns:
A confirmation of putting the miner to sleep.
</details>
"""
return await self.send_privileged_command("curtail", "sleep")
async def wakeup(self) -> dict:
"""Wake the miner up from sleep mode.
<details>
<summary>Expand</summary>
Returns:
A confirmation of waking the miner up.
</details>
"""
return await self.send_privileged_command("curtail", "wakeup")
async def devdetails(self) -> dict: async def devdetails(self) -> dict:
"""Get data on all devices with their static details. """Get data on all devices with their static details.
@@ -185,7 +247,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of diabling the pool. A confirmation of diabling the pool.
</details> </details>
""" """
return await self.send_command("disablepool", parameters=n) return await self.send_command("disablepool", n)
async def edevs(self) -> dict: async def edevs(self) -> dict:
"""Alias for devs""" """Alias for devs"""
@@ -203,7 +265,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of enabling pool n. A confirmation of enabling pool n.
</details> </details>
""" """
return await self.send_command("enablepool", parameters=n) return await self.send_command("enablepool", n)
async def estats(self) -> dict: async def estats(self) -> dict:
"""Alias for stats""" """Alias for stats"""
@@ -220,13 +282,14 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
""" """
return await self.send_command("fans") return await self.send_command("fans")
async def fanset(self, session_id: str, speed: int, min_fans: int = None) -> dict: async def fanset(
"""Set fan control. Requires a session_id from logon. self, speed: int = None, min_fans: int = None, power_off_speed: int = None
) -> dict:
"""Set fan control.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters: Parameters:
session_id: Session id from the logon command.
speed: The fan speed to set. Use -1 to set automatically. speed: The fan speed to set. Use -1 to set automatically.
min_fans: The minimum number of fans to use. Optional. min_fans: The minimum number of fans to use. Optional.
@@ -234,10 +297,14 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of setting fan control values. A confirmation of setting fan control values.
</details> </details>
""" """
fanset_data = [str(session_id), str(speed)] fanset_data = []
if speed is not None:
fanset_data.append(f"speed={speed}")
if min_fans is not None: if min_fans is not None:
fanset_data.append(str(min_fans)) fanset_data.append(f"min_fans={min_fans}")
return await self.send_command("fanset", parameters=",".join(fanset_data)) if power_off_speed is not None:
fanset_data.append(f"power_off_speed={power_off_speed}")
return await self.send_privileged_command("fanset", *fanset_data)
async def frequencyget(self, board_n: int, chip_n: int = None) -> dict: async def frequencyget(self, board_n: int, chip_n: int = None) -> dict:
"""Get frequency data for a board and chips. """Get frequency data for a board and chips.
@@ -255,17 +322,14 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
frequencyget_data = [str(board_n)] frequencyget_data = [str(board_n)]
if chip_n is not None: if chip_n is not None:
frequencyget_data.append(str(chip_n)) frequencyget_data.append(str(chip_n))
return await self.send_command( return await self.send_command("frequencyget", *frequencyget_data)
"frequencyget", parameters=",".join(frequencyget_data)
)
async def frequencyset(self, session_id: str, board_n: int, freq: int) -> dict: async def frequencyset(self, board_n: int, freq: int) -> dict:
"""Set frequency. Requires a session_id from logon. """Set frequency.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters: Parameters:
session_id: Session id from the logon command.
board_n: The board number to set frequency on. board_n: The board number to set frequency on.
freq: The frequency to set. freq: The frequency to set.
@@ -273,26 +337,21 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of setting frequency values. A confirmation of setting frequency values.
</details> </details>
""" """
return await self.send_command( return await self.send_privileged_command("frequencyset", board_n, freq)
"frequencyset", parameters=f"{session_id},{board_n},{freq}"
)
async def frequencystop(self, session_id: str, board_n: int) -> dict: async def frequencystop(self, board_n: int) -> dict:
"""Stop set frequency. Requires a session_id from logon. """Stop set frequency.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters: Parameters:
session_id: Session id from the logon command.
board_n: The board number to set frequency on. board_n: The board number to set frequency on.
Returns: Returns:
A confirmation of stopping frequencyset value. A confirmation of stopping frequencyset value.
</details> </details>
""" """
return await self.send_command( return await self.send_privileged_command("frequencystop", board_n)
"frequencystop", parameters=f"{session_id},{board_n}"
)
async def groupquota(self, group_n: int, quota: int) -> dict: async def groupquota(self, group_n: int, quota: int) -> dict:
"""Set a group's quota. """Set a group's quota.
@@ -307,7 +366,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of setting quota value. A confirmation of setting quota value.
</details> </details>
""" """
return await self.send_command("groupquota", parameters=f"{group_n},{quota}") return await self.send_command("groupquota", group_n, quota)
async def groups(self) -> dict: async def groups(self) -> dict:
"""Get pool group data. """Get pool group data.
@@ -336,19 +395,14 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
healthchipget_data = [str(board_n)] healthchipget_data = [str(board_n)]
if chip_n is not None: if chip_n is not None:
healthchipget_data.append(str(chip_n)) healthchipget_data.append(str(chip_n))
return await self.send_command( return await self.send_command("healthchipget", *healthchipget_data)
"healthchipget", parameters=",".join(healthchipget_data)
)
async def healthchipset( async def healthchipset(self, board_n: int, chip_n: int = None) -> dict:
self, session_id: str, board_n: int, chip_n: int = None """Select the next chip to have its health checked.
) -> dict:
"""Select the next chip to have its health checked. Requires a session_id from logon.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters: Parameters:
session_id: Session id from the logon command.
board_n: The board number to next get chip health of. board_n: The board number to next get chip health of.
chip_n: The chip number to next get chip health of. Optional. chip_n: The chip number to next get chip health of. Optional.
@@ -356,12 +410,10 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
Confirmation of selecting the next health check chip. Confirmation of selecting the next health check chip.
</details> </details>
""" """
healthchipset_data = [session_id, str(board_n)] healthchipset_data = [str(board_n)]
if chip_n is not None: if chip_n is not None:
healthchipset_data.append(str(chip_n)) healthchipset_data.append(str(chip_n))
return await self.send_command( return await self.send_privileged_command("healthchipset", *healthchipset_data)
"healthchipset", parameters=",".join(healthchipset_data)
)
async def healthctrl(self) -> dict: async def healthctrl(self) -> dict:
"""Get health check config. """Get health check config.
@@ -374,15 +426,12 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
""" """
return await self.send_command("healthctrl") return await self.send_command("healthctrl")
async def healthctrlset( async def healthctrlset(self, num_readings: int, amplified_factor: float) -> dict:
self, session_id: str, num_readings: int, amplified_factor: float """Set health control config.
) -> dict:
"""Set health control config. Requires a session_id from logon.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters: Parameters:
session_id: Session id from the logon command.
num_readings: The minimum number of readings for evaluation. num_readings: The minimum number of readings for evaluation.
amplified_factor: Performance factor of the evaluation. amplified_factor: Performance factor of the evaluation.
@@ -390,9 +439,8 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of setting health control config. A confirmation of setting health control config.
</details> </details>
""" """
return await self.send_command( return await self.send_privileged_command(
"healthctrlset", "healthctrlset", num_readings, amplified_factor
parameters=f"{session_id},{num_readings},{amplified_factor}",
) )
async def kill(self) -> dict: async def kill(self) -> dict:
@@ -419,16 +467,14 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
async def ledset( async def ledset(
self, self,
session_id: str,
color: Literal["red"], color: Literal["red"],
state: Literal["on", "off", "blink"], state: Literal["on", "off", "blink"],
) -> dict: ) -> dict:
"""Set led. Requires a session_id from logon. """Set led.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters: Parameters:
session_id: Session id from the logon command.
color: The color LED to set. Can be "red". color: The color LED to set. Can be "red".
state: The state to set the LED to. Can be "on", "off", or "blink". state: The state to set the LED to. Can be "on", "off", or "blink".
@@ -436,9 +482,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of setting LED. A confirmation of setting LED.
</details> </details>
""" """
return await self.send_command( return await self.send_privileged_command("ledset", color, state)
"ledset", parameters=f"{session_id},{color},{state}"
)
async def limits(self) -> dict: async def limits(self) -> dict:
"""Get max and min values of config parameters. """Get max and min values of config parameters.
@@ -451,8 +495,8 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
""" """
return await self.send_command("limits") return await self.send_command("limits")
async def logoff(self, session_id: str) -> dict: async def logoff(self) -> dict:
"""Log off of a session. Requires a session id from an active session. """Log off of a session.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
@@ -463,7 +507,9 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
Confirmation of logging off a session. Confirmation of logging off a session.
</details> </details>
""" """
return await self.send_command("logoff", parameters=session_id) res = await self.send_privileged_command("logoff")
self.session_token = None
return res
async def logon(self) -> dict: async def logon(self) -> dict:
"""Get or create a session. """Get or create a session.
@@ -510,13 +556,12 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
""" """
return await self.send_command("profiles") return await self.send_command("profiles")
async def profileset(self, session_id: str, board_n: int, profile: str) -> dict: async def profileset(self, board_n: int, profile: str) -> dict:
"""Set active profile for a board. Requires a session_id from logon. """Set active profile for a board.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters: Parameters:
session_id: Session id from the logon command.
board_n: The board to set the profile on. board_n: The board to set the profile on.
profile: The profile name to use. profile: The profile name to use.
@@ -524,17 +569,14 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of setting the profile on board_n. A confirmation of setting the profile on board_n.
</details> </details>
""" """
return await self.send_command( return await self.send_privileged_command("profileset", board_n, profile)
"profileset", parameters=f"{session_id},{board_n},{profile}"
)
async def reboot(self, session_id: str, board_n: int, delay_s: int = None) -> dict: async def reboot(self, board_n: int, delay_s: int = None) -> dict:
"""Reboot a board. Requires a session_id from logon. """Reboot a board.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters: Parameters:
session_id: Session id from the logon command.
board_n: The board to reboot. board_n: The board to reboot.
delay_s: The number of seconds to delay until startup. If it is 0, the board will just stop. Optional. delay_s: The number of seconds to delay until startup. If it is 0, the board will just stop. Optional.
@@ -542,24 +584,21 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of rebooting board_n. A confirmation of rebooting board_n.
</details> </details>
""" """
reboot_data = [session_id, str(board_n)] reboot_data = [str(board_n)]
if delay_s is not None: if delay_s is not None:
reboot_data.append(str(delay_s)) reboot_data.append(str(delay_s))
return await self.send_command("reboot", parameters=",".join(reboot_data)) return await self.send_privileged_command("reboot", *reboot_data)
async def rebootdevice(self, session_id: str) -> dict: async def rebootdevice(self) -> dict:
"""Reboot the miner. Requires a session_id from logon. """Reboot the miner.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns: Returns:
A confirmation of rebooting the miner. A confirmation of rebooting the miner.
</details> </details>
""" """
return await self.send_command("rebootdevice", parameters=session_id) return await self.send_privileged_command("rebootdevice")
async def removegroup(self, group_id: str) -> dict: async def removegroup(self, group_id: str) -> dict:
"""Remove a pool group. """Remove a pool group.
@@ -575,19 +614,16 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
""" """
return await self.send_command("removegroup", parameters=group_id) return await self.send_command("removegroup", parameters=group_id)
async def resetminer(self, session_id: str) -> dict: async def resetminer(self) -> dict:
"""Restart the mining process. Requires a session_id from logon. """Restart the mining process.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns: Returns:
A confirmation of restarting the mining process. A confirmation of restarting the mining process.
</details> </details>
""" """
return await self.send_command("resetminer", parameters=session_id) return await self.send_privileged_command("resetminer")
async def removepool(self, pool_id: int) -> dict: async def removepool(self, pool_id: int) -> dict:
"""Remove a pool. """Remove a pool.
@@ -614,7 +650,9 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
""" """
return await self.send_command("session") return await self.send_command("session")
async def tempctrlset(self, target: int, hot: int, dangerous: int) -> dict: async def tempctrlset(
self, target: int = None, hot: int = None, dangerous: int = None
) -> dict:
"""Set temp control values. """Set temp control values.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
@@ -629,7 +667,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
</details> </details>
""" """
return await self.send_command( return await self.send_command(
"tempctrlset", parameters=f"{target},{hot},{dangerous}" "tempctrlset", target or "", hot or "", dangerous or ""
) )
async def stats(self) -> dict: async def stats(self) -> dict:
@@ -668,7 +706,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of switching to the pool. A confirmation of switching to the pool.
</details> </details>
""" """
return await self.send_command("switchpool", parameters=str(pool_id)) return await self.send_command("switchpool", pool_id)
async def tempctrl(self) -> dict: async def tempctrl(self) -> dict:
"""Get temperature control data. """Get temperature control data.
@@ -716,15 +754,14 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
Board voltage values. Board voltage values.
</details> </details>
""" """
return await self.send_command("frequencyget", parameters=str(board_n)) return await self.send_command("frequencyget", board_n)
async def voltageset(self, session_id: str, board_n: int, voltage: float) -> dict: async def voltageset(self, board_n: int, voltage: float) -> dict:
"""Set voltage values. """Set voltage values.
<details> <details>
<summary>Expand</summary> <summary>Expand</summary>
Parameters: Parameters:
session_id: Session id from the logon command.
board_n: The board to set the voltage on. board_n: The board to set the voltage on.
voltage: The voltage to use. voltage: The voltage to use.
@@ -732,23 +769,7 @@ class LUXMinerRPCAPI(BaseMinerRPCAPI):
A confirmation of setting the voltage. A confirmation of setting the voltage.
</details> </details>
""" """
return await self.send_command( return await self.send_privileged_command("voltageset", board_n, voltage)
"voltageset", parameters=f"{session_id},{board_n},{voltage}"
)
async def wakeup(self, session_id: str) -> dict:
"""Take the miner out of sleep mode. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
A confirmation of resuming mining.
</details>
"""
return await self.send_command("wakeup", parameters=session_id)
async def upgraderun(self): async def upgraderun(self):
""" """