feature: add basic support for LuxOS

This commit is contained in:
UpstreamData
2023-06-26 15:35:52 -06:00
parent 37fd60b462
commit bf3bd7c2b9
9 changed files with 1276 additions and 3 deletions

759
pyasic/API/luxminer.py Normal file
View File

@@ -0,0 +1,759 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from typing import Literal
from pyasic.API import BaseMinerAPI
class LUXMinerAPI(BaseMinerAPI):
"""An abstraction of the LUXMiner API.
Each method corresponds to an API command in LUXMiner.
[LUXMiner API documentation](https://docs.firmware.luxor.tech/API/intro)
This class abstracts use of the LUXMiner API, as well as the
methods for sending commands to it. The `self.send_command()`
function handles sending a command to the miner asynchronously, and
as such is the base for many of the functions in this class, which
rely on it to send the command for them.
Parameters:
ip: The IP of the miner to reference the API on.
port: The port to reference the API on. Default is 4028.
"""
def __init__(self, ip: str, api_ver: str = "0.0.0", port: int = 4028) -> None:
super().__init__(ip, port=port)
self.api_ver = api_ver
async def addgroup(self, name: str, quota: int) -> dict:
"""Add a pool group.
<details>
<summary>Expand</summary>
Parameters:
name: The group name.
quota: The group quota.
Returns:
Confirmation of adding a pool group.
</details>
"""
return await self.send_command("addgroup", parameters=f"{name},{quota}")
async def addpool(
self, url: str, user: str, pwd: str = "", group_id: str = None
) -> dict:
"""Add a pool.
<details>
<summary>Expand</summary>
Parameters:
url: The pool url.
user: The pool username.
pwd: The pool password.
group_id: The group ID to use.
Returns:
Confirmation of adding a pool.
</details>
"""
pool_data = [url, user, pwd]
if group_id is not None:
pool_data.append(group_id)
return await self.send_command("addpool", parameters=",".join(pool_data))
async def asc(self, n: int) -> dict:
"""Get data for ASC device n.
<details>
<summary>Expand</summary>
Parameters:
n: The device to get data for.
Returns:
The data for ASC device n.
</details>
"""
return await self.send_command("asc", parameters=n)
async def asccount(self) -> dict:
"""Get data on the number of ASC devices and their info.
<details>
<summary>Expand</summary>
Returns:
Data on all ASC devices.
</details>
"""
return await self.send_command("asccount")
async def check(self, command: str) -> dict:
"""Check if the command `command` exists in LUXMiner.
<details>
<summary>Expand</summary>
Parameters:
command: The command to check.
Returns:
## Information about a command:
* Exists (Y/N) <- the command exists in this version
* Access (Y/N) <- you have access to use the command
</details>
"""
return await self.send_command("check", parameters=command)
async def coin(self) -> dict:
"""Get information on the current coin.
<details>
<summary>Expand</summary>
Returns:
## Information about the current coin being mined:
* Hash Method <- the hashing algorithm
* Current Block Time <- blocktime as a float, 0 means none
* Current Block Hash <- the hash of the current block, blank means none
* LP <- whether LP is in use on at least 1 pool
* Network Difficulty: the current network difficulty
</details>
"""
return await self.send_command("coin")
async def config(self) -> dict:
"""Get some basic configuration info.
<details>
<summary>Expand</summary>
Returns:
Miner configuration information.
</details>
"""
return await self.send_command("config")
async def curtail(self, session_id: str) -> dict:
"""Put the miner into sleep mode. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
A confirmation of putting the miner to sleep.
</details>
"""
return await self.send_command("curtail", parameters=session_id)
async def devdetails(self) -> dict:
"""Get data on all devices with their static details.
<details>
<summary>Expand</summary>
Returns:
Data on all devices with their static details.
</details>
"""
return await self.send_command("devdetails")
async def devs(self) -> dict:
"""Get data on each PGA/ASC with their details.
<details>
<summary>Expand</summary>
Returns:
Data on each PGA/ASC with their details.
</details>
"""
return await self.send_command("devs")
async def disablepool(self, n: int) -> dict:
"""Disable a pool.
<details>
<summary>Expand</summary>
Parameters:
n: Pool to disable.
Returns:
A confirmation of diabling the pool.
</details>
"""
return await self.send_command("disablepool", parameters=n)
async def edevs(self) -> dict:
"""Alias for devs"""
return await self.devs()
async def enablepool(self, n: int) -> dict:
"""Enable pool n.
<details>
<summary>Expand</summary>
Parameters:
n: The pool to enable.
Returns:
A confirmation of enabling pool n.
</details>
"""
return await self.send_command("enablepool", parameters=n)
async def estats(self) -> dict:
"""Alias for stats"""
return await self.stats()
async def fans(self) -> dict:
"""Get fan data.
<details>
<summary>Expand</summary>
Returns:
Data on the fans of the miner.
</details>
"""
return await self.send_command("fans")
async def fanset(self, session_id: str, speed: int, min_fans: int = None) -> dict:
"""Set fan control. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
speed: The fan speed to set. Use -1 to set automatically.
min_fans: The minimum number of fans to use. Optional.
Returns:
A confirmation of setting fan control values.
</details>
"""
fanset_data = [str(session_id), str(speed)]
if min_fans is not None:
fanset_data.append(str(min_fans))
return await self.send_command("fanset", parameters=",".join(fanset_data))
async def frequencyget(self, board_n: int, chip_n: int = None) -> dict:
"""Get frequency data for a board and chips.
<details>
<summary>Expand</summary>
Parameters:
board_n: The board number to get frequency info from.
chip_n: The chip number to get frequency info from. Optional.
Returns:
Board and/or chip frequency values.
</details>
"""
frequencyget_data = [str(board_n)]
if chip_n is not None:
frequencyget_data.append(str(chip_n))
return await self.send_command(
"frequencyget", parameters=",".join(frequencyget_data)
)
async def frequencyset(self, session_id: str, board_n: int, freq: int) -> dict:
"""Set frequency. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board number to set frequency on.
freq: The frequency to set.
Returns:
A confirmation of setting frequency values.
</details>
"""
return await self.send_command(
"frequencyset", parameters=f"{session_id},{board_n},{freq}"
)
async def frequencystop(self, session_id: str, board_n: int) -> dict:
"""Stop set frequency. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board number to set frequency on.
Returns:
A confirmation of stopping frequencyset value.
</details>
"""
return await self.send_command(
"frequencystop", parameters=f"{session_id},{board_n}"
)
async def groupquota(self, group_n: int, quota: int) -> dict:
"""Set a group's quota.
<details>
<summary>Expand</summary>
Parameters:
group_n: The group number to set quota on.
quota: The quota to use.
Returns:
A confirmation of setting quota value.
</details>
"""
return await self.send_command("groupquota", parameters=f"{group_n},{quota}")
async def groups(self) -> dict:
"""Get pool group data.
<details>
<summary>Expand</summary>
Returns:
Data on the pool groups on the miner.
</details>
"""
return await self.send_command("groups")
async def healthchipget(self, board_n: int, chip_n: int = None) -> dict:
"""Get chip health.
<details>
<summary>Expand</summary>
Parameters:
board_n: The board number to get chip health of.
chip_n: The chip number to get chip health of. Optional.
Returns:
Chip health data.
</details>
"""
healthchipget_data = [str(board_n)]
if chip_n is not None:
healthchipget_data.append(str(chip_n))
return await self.send_command(
"healthchipget", parameters=",".join(healthchipget_data)
)
async def healthchipset(
self, session_id: str, board_n: int, chip_n: int = None
) -> dict:
"""Select the next chip to have its health checked. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board number to next get chip health of.
chip_n: The chip number to next get chip health of. Optional.
Returns:
Confirmation of selecting the next health check chip.
</details>
"""
healthchipset_data = [session_id, str(board_n)]
if chip_n is not None:
healthchipset_data.append(str(chip_n))
return await self.send_command(
"healthchipset", parameters=",".join(healthchipset_data)
)
async def healthctrl(self) -> dict:
"""Get health check config.
<details>
<summary>Expand</summary>
Returns:
Health check config.
</details>
"""
return await self.send_command("healthctrl")
async def healthctrlset(
self, session_id: str, num_readings: int, amplified_factor: float
) -> dict:
"""Set health control config. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
num_readings: The minimum number of readings for evaluation.
amplified_factor: Performance factor of the evaluation.
Returns:
A confirmation of setting health control config.
</details>
"""
return await self.send_command(
"healthctrlset",
parameters=f"{session_id},{num_readings},{amplified_factor}",
)
async def kill(self) -> dict:
"""Forced session kill. Use logoff instead.
<details>
<summary>Expand</summary>
Returns:
A confirmation of killing the active session.
</details>
"""
return await self.send_command("kill")
async def lcd(self) -> dict:
"""Get a general all-in-one status summary of the miner. Always zeros on LUXMiner.
<details>
<summary>Expand</summary>
Returns:
An all-in-one status summary of the miner.
</details>
"""
return await self.send_command("lcd")
async def ledset(
self,
session_id: str,
color: Literal["red"],
state: Literal["on", "off", "blink"],
) -> dict:
"""Set led. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
color: The color LED to set. Can be "red".
state: The state to set the LED to. Can be "on", "off", or "blink".
Returns:
A confirmation of setting LED.
</details>
"""
return await self.send_command(
"ledset", parameters=f"{session_id},{color},{state}"
)
async def limits(self) -> dict:
"""Get max and min values of config parameters.
<details>
<summary>Expand</summary>
Returns:
Data on max and min values of config parameters.
</details>
"""
return await self.send_command("limits")
async def logoff(self, session_id: str) -> dict:
"""Log off of a session. Requires a session id from an active session.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
Confirmation of logging off a session.
</details>
"""
return await self.send_command("logoff", parameters=session_id)
async def logon(self) -> dict:
"""Get or create a session.
<details>
<summary>Expand</summary>
Returns:
The Session ID to be used.
</details>
"""
return await self.send_command("logon")
async def pools(self) -> dict:
"""Get pool information.
<details>
<summary>Expand</summary>
Returns:
Miner pool information.
</details>
"""
return await self.send_command("pools")
async def power(self) -> dict:
"""Get the estimated power usage in watts.
<details>
<summary>Expand</summary>
Returns:
Estimated power usage in watts.
</details>
"""
return await self.send_command("power")
async def profiles(self) -> dict:
"""Get the available profiles.
<details>
<summary>Expand</summary>
Returns:
Data on available profiles.
</details>
"""
return await self.send_command("profiles")
async def profileset(self, session_id: str, board_n: int, profile: str) -> dict:
"""Set active profile for a board. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board to set the profile on.
profile: The profile name to use.
Returns:
A confirmation of setting the profile on board_n.
</details>
"""
return await self.send_command(
"profileset", parameters=f"{session_id},{board_n},{profile}"
)
async def reboot(self, session_id: str, board_n: int, delay_s: int = None) -> dict:
"""Reboot a board. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board to reboot.
delay_s: The number of seconds to delay until startup. If it is 0, the board will just stop. Optional.
Returns:
A confirmation of rebooting board_n.
</details>
"""
reboot_data = [session_id, str(board_n)]
if delay_s is not None:
reboot_data.append(str(delay_s))
return await self.send_command("reboot", parameters=",".join(reboot_data))
async def rebootdevice(self, session_id: str) -> dict:
"""Reboot the miner. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
A confirmation of rebooting the miner.
</details>
"""
return await self.send_command("rebootdevice", parameters=session_id)
async def removegroup(self, group_id: str) -> dict:
"""Remove a pool group.
<details>
<summary>Expand</summary>
Parameters:
group_id: Group id to remove.
Returns:
A confirmation of removing the pool group.
</details>
"""
return await self.send_command("removegroup", parameters=group_id)
async def resetminer(self, session_id: str) -> dict:
"""Restart the mining process. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
A confirmation of restarting the mining process.
</details>
"""
return await self.send_command("resetminer", parameters=session_id)
async def removepool(self, pool_id: int) -> dict:
"""Remove a pool.
<details>
<summary>Expand</summary>
Parameters:
pool_id: Pool to remove.
Returns:
A confirmation of removing the pool.
</details>
"""
return await self.send_command("removepool", parameters=str(pool_id))
async def session(self) -> dict:
"""Get the current session.
<details>
<summary>Expand</summary>
Returns:
Data on the current session.
</details>
"""
return await self.send_command("session")
async def tempctrlset(self, target: int, hot: int, dangerous: int) -> dict:
"""Set temp control values.
<details>
<summary>Expand</summary>
Parameters:
target: Target temp.
hot: Hot temp.
dangerous: Dangerous temp.
Returns:
A confirmation of setting the temp control config.
</details>
"""
return await self.send_command(
"tempctrlset", parameters=f"{target},{hot},{dangerous}"
)
async def stats(self) -> dict:
"""Get stats of each device/pool with more than 1 getwork.
<details>
<summary>Expand</summary>
Returns:
Stats of each device/pool with more than 1 getwork.
</details>
"""
return await self.send_command("stats")
async def summary(self) -> dict:
"""Get the status summary of the miner.
<details>
<summary>Expand</summary>
Returns:
The status summary of the miner.
</details>
"""
return await self.send_command("summary")
async def switchpool(self, pool_id: int) -> dict:
"""Switch to a pool.
<details>
<summary>Expand</summary>
Parameters:
pool_id: Pool to switch to.
Returns:
A confirmation of switching to the pool.
</details>
"""
return await self.send_command("switchpool", parameters=str(pool_id))
async def tempctrl(self) -> dict:
"""Get temperature control data.
<details>
<summary>Expand</summary>
Returns:
Data about the temp control settings of the miner.
</details>
"""
return await self.send_command("tempctrl")
async def temps(self) -> dict:
"""Get temperature data.
<details>
<summary>Expand</summary>
Returns:
Data on the temps of the miner.
</details>
"""
return await self.send_command("temps")
async def version(self) -> dict:
"""Get miner version info.
<details>
<summary>Expand</summary>
Returns:
Miner version information.
</details>
"""
return await self.send_command("version")
async def voltageget(self, board_n: int) -> dict:
"""Get voltage data for a board.
<details>
<summary>Expand</summary>
Parameters:
board_n: The board number to get voltage info from.
Returns:
Board voltage values.
</details>
"""
return await self.send_command("frequencyget", parameters=str(board_n))
async def voltageset(self, session_id: str, board_n: int, voltage: float) -> dict:
"""Set voltage values.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
board_n: The board to set the voltage on.
voltage: The voltage to use.
Returns:
A confirmation of setting the voltage.
</details>
"""
return await self.send_command(
"voltageset", parameters=f"{session_id},{board_n},{voltage}"
)
async def wakeup(self, session_id: str) -> dict:
"""Take the miner out of sleep mode. Requires a session_id from logon.
<details>
<summary>Expand</summary>
Parameters:
session_id: Session id from the logon command.
Returns:
A confirmation of resuming mining.
</details>
"""
return await self.send_command("wakeup", parameters=session_id)

View File

@@ -18,4 +18,5 @@ from .bmminer import *
from .bosminer import *
from .cgminer import *
from .hiveon import *
from .luxos import *
from .vnish import *

View File

@@ -0,0 +1,22 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from pyasic.miners.backends import LUXMiner
from pyasic.miners.types import S9
class LUXMinerS9(LUXMiner, S9):
pass

View File

@@ -0,0 +1,17 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from .S9 import LUXMinerS9

View File

@@ -0,0 +1,17 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from .X9 import *

View File

@@ -22,5 +22,6 @@ from .btminer import BTMiner
from .cgminer import CGMiner
from .cgminer_avalon import CGMinerAvalon
from .hiveon import Hiveon
from .luxminer import LUXMiner
from .vnish import VNish
from .whatsminer import M2X, M3X, M5X

View File

@@ -0,0 +1,431 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
import logging
from collections import namedtuple
from typing import List, Optional, Tuple, Union
import toml
from pyasic.API.bosminer import BOSMinerAPI
from pyasic.API.luxminer import LUXMinerAPI
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard
from pyasic.data.error_codes import BraiinsOSError, MinerErrorData
from pyasic.errors import APIError
from pyasic.miners.base import BaseMiner
from pyasic.web.bosminer import BOSMinerWebAPI
LUXMINER_DATA_LOC = {
"mac": {
"cmd": "get_mac",
"kwargs": {"api_config": {"api": "config"}},
},
"model": {"cmd": "get_model", "kwargs": {}},
"api_ver": {
"cmd": "get_api_ver",
"kwargs": {},
},
"fw_ver": {
"cmd": "get_fw_ver",
"kwargs": {},
},
"hostname": {
"cmd": "get_hostname",
"kwargs": {},
},
"hashrate": {
"cmd": "get_hashrate",
"kwargs": {},
},
"nominal_hashrate": {
"cmd": "get_nominal_hashrate",
"kwargs": {},
},
"hashboards": {
"cmd": "get_hashboards",
"kwargs": {},
},
"wattage": {
"cmd": "get_wattage",
"kwargs": {},
},
"wattage_limit": {
"cmd": "get_wattage_limit",
"kwargs": {},
},
"fans": {
"cmd": "get_fans",
"kwargs": {},
},
"fan_psu": {"cmd": "get_fan_psu", "kwargs": {}},
"env_temp": {"cmd": "get_env_temp", "kwargs": {}},
"errors": {
"cmd": "get_errors",
"kwargs": {},
},
"fault_light": {
"cmd": "get_fault_light",
"kwargs": {},
},
"pools": {
"cmd": "get_pools",
"kwargs": {},
},
"is_mining": {
"cmd": "is_mining",
"kwargs": {},
},
}
class LUXMiner(BaseMiner):
def __init__(self, ip: str, api_ver: str = "0.0.0") -> None:
super().__init__(ip)
# interfaces
self.api = LUXMinerAPI(ip, api_ver)
# self.web = BOSMinerWebAPI(ip)
# static data
self.api_type = "LUXMiner"
# data gathering locations
self.data_locations = LUXMINER_DATA_LOC
# autotuning/shutdown support
# self.supports_autotuning = True
# self.supports_shutdown = True
# data storage
self.api_ver = api_ver
async def _get_session(self) -> Optional[str]:
try:
data = await self.api.session()
if not data["SESSION"][0]["SessionID"] == "":
return data["SESSION"][0]["SessionID"]
except APIError:
pass
try:
data = await self.api.logon()
return data["SESSION"][0]["SessionID"]
except (LookupError, APIError):
return
async def fault_light_on(self) -> bool:
"""Sends command to turn on fault light on the miner."""
try:
session_id = await self._get_session()
if session_id:
await self.api.ledset(session_id, "red", "blink")
return True
except (APIError, LookupError):
pass
return False
async def fault_light_off(self) -> bool:
"""Sends command to turn off fault light on the miner."""
try:
session_id = await self._get_session()
if session_id:
await self.api.ledset(session_id, "red", "off")
return True
except (APIError, LookupError):
pass
return False
async def restart_backend(self) -> bool:
"""Restart luxminer hashing process. Wraps [`restart_luxminer`][pyasic.miners.backends.luxminer.LUXMiner.restart_luxminer] to standardize."""
return await self.restart_luxminer()
async def restart_luxminer(self) -> bool:
"""Restart luxminer hashing process."""
try:
session_id = await self._get_session()
if session_id:
await self.api.resetminer(session_id)
return True
except (APIError, LookupError):
pass
return False
async def stop_mining(self) -> bool:
try:
session_id = await self._get_session()
if session_id:
await self.api.curtail(session_id)
return True
except (APIError, LookupError):
pass
return False
async def resume_mining(self) -> bool:
try:
session_id = await self._get_session()
if session_id:
await self.api.wakeup(session_id)
return True
except (APIError, LookupError):
pass
async def reboot(self) -> bool:
"""Reboots power to the physical miner."""
try:
session_id = await self._get_session()
if session_id:
await self.api.rebootdevice(session_id)
return True
except (APIError, LookupError):
pass
return False
async def get_config(self) -> MinerConfig:
"""Gets the config for the miner and sets it as `self.config`.
Returns:
The config from `self.config`.
"""
return self.config
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
"""Configures miner with yaml config."""
pass
async def set_power_limit(self, wattage: int) -> bool:
return False
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def get_mac(self, api_config: dict = None) -> Optional[str]:
mac = None
if not api_config:
try:
api_config = await self.api.config()
except APIError:
return None
if api_config:
try:
mac = api_config["CONFIG"][0]["MACAddr"]
except KeyError:
return None
return mac
async def get_model(self) -> Optional[str]:
if self.model is not None:
return self.model + " (LuxOS)"
return "? (LuxOS)"
async def get_version(self) -> Tuple[Optional[str], Optional[str]]:
pass
async def get_api_ver(self) -> Optional[str]:
pass
async def get_fw_ver(self) -> Optional[str]:
pass
async def get_hostname(self) -> Union[str, None]:
pass
async def get_hashrate(self, api_summary: dict = None) -> Optional[float]:
# get hr from API
if not api_summary:
try:
api_summary = await self.api.summary()
except APIError:
pass
if api_summary:
try:
return round(float(api_summary["SUMMARY"][0]["GHS 5s"] / 1000), 2)
except (IndexError, KeyError, ValueError, TypeError):
pass
async def get_hashboards(self, api_stats: dict = None) -> List[HashBoard]:
hashboards = []
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
try:
board_offset = -1
boards = api_stats["STATS"]
if len(boards) > 1:
for board_num in range(1, 16, 5):
for _b_num in range(5):
b = boards[1].get(f"chain_acn{board_num + _b_num}")
if b and not b == 0 and board_offset == -1:
board_offset = board_num
if board_offset == -1:
board_offset = 1
for i in range(board_offset, board_offset + self.ideal_hashboards):
hashboard = HashBoard(
slot=i - board_offset, expected_chips=self.nominal_chips
)
chip_temp = boards[1].get(f"temp{i}")
if chip_temp:
hashboard.chip_temp = round(chip_temp)
temp = boards[1].get(f"temp2_{i}")
if temp:
hashboard.temp = round(temp)
hashrate = boards[1].get(f"chain_rate{i}")
if hashrate:
hashboard.hashrate = round(float(hashrate) / 1000, 2)
chips = boards[1].get(f"chain_acn{i}")
if chips:
hashboard.chips = chips
hashboard.missing = False
if (not chips) or (not chips > 0):
hashboard.missing = True
hashboards.append(hashboard)
except (IndexError, KeyError, ValueError, TypeError):
pass
return hashboards
async def get_env_temp(self) -> Optional[float]:
return None
async def get_wattage(self, api_power: dict) -> Optional[int]:
if not api_power:
try:
api_power = await self.api.power()
except APIError:
pass
if api_power:
try:
return api_power["POWER"][0]["Watts"]
except (IndexError, KeyError, ValueError, TypeError):
pass
async def get_wattage_limit(self) -> Optional[int]:
return None
async def get_fans(self, api_fans: dict = None) -> List[Fan]:
if not api_fans:
try:
api_fans = await self.api.fans()
except APIError:
pass
fans = []
if api_fans:
for fan in range(self.fan_count):
try:
fans.append(Fan(api_fans["FANS"][0]["RPM"]))
except (IndexError, KeyError, ValueError, TypeError):
fans.append(Fan())
return fans
async def get_fan_psu(self) -> Optional[int]:
return None
async def get_pools(self, api_pools: dict = None) -> List[dict]:
if not api_pools:
try:
api_pools = await self.api.pools()
except APIError:
pass
if api_pools:
seen = []
groups = [{"quota": "0"}]
if api_pools.get("POOLS"):
for i, pool in enumerate(api_pools["POOLS"]):
if len(seen) == 0:
seen.append(pool["User"])
if not pool["User"] in seen:
# need to use get_config, as this will never read perfectly as there are some bad edge cases
groups = []
cfg = await self.get_config()
if cfg:
for group in cfg.pool_groups:
pools = {"quota": group.quota}
for _i, _pool in enumerate(group.pools):
pools[f"pool_{_i + 1}_url"] = _pool.url.replace(
"stratum+tcp://", ""
).replace("stratum2+tcp://", "")
pools[f"pool_{_i + 1}_user"] = _pool.username
groups.append(pools)
return groups
else:
groups[0][f"pool_{i + 1}_url"] = (
pool["URL"]
.replace("stratum+tcp://", "")
.replace("stratum2+tcp://", "")
)
groups[0][f"pool_{i + 1}_user"] = pool["User"]
else:
groups = []
cfg = await self.get_config()
if cfg:
for group in cfg.pool_groups:
pools = {"quota": group.quota}
for _i, _pool in enumerate(group.pools):
pools[f"pool_{_i + 1}_url"] = _pool.url.replace(
"stratum+tcp://", ""
).replace("stratum2+tcp://", "")
pools[f"pool_{_i + 1}_user"] = _pool.username
groups.append(pools)
return groups
return groups
async def get_errors(self) -> List[MinerErrorData]:
pass
async def get_fault_light(self) -> bool:
pass
async def get_nominal_hashrate(self, api_stats: dict = None) -> Optional[float]:
if not api_stats:
try:
api_stats = await self.api.stats()
except APIError:
pass
if api_stats:
try:
ideal_rate = api_stats["STATS"][1]["total_rateideal"]
try:
rate_unit = api_stats["STATS"][1]["rate_unit"]
except KeyError:
rate_unit = "GH"
if rate_unit == "GH":
return round(ideal_rate / 1000, 2)
if rate_unit == "MH":
return round(ideal_rate / 1000000, 2)
else:
return round(ideal_rate, 2)
except (KeyError, IndexError):
pass
async def is_mining(self) -> Optional[bool]:
pass

View File

@@ -35,6 +35,7 @@ from pyasic.miners.backends import (
CGMiner,
CGMinerAvalon,
Hiveon,
LUXMiner,
VNish,
)
from pyasic.miners.base import AnyMiner
@@ -56,6 +57,7 @@ class MinerTypes(enum.Enum):
BRAIINS_OS = 5
VNISH = 6
HIVEON = 7
LUX_OS = 8
MINER_CLASSES = {
@@ -337,6 +339,10 @@ MINER_CLASSES = {
None: Hiveon,
"ANTMINER T9": HiveonT9,
},
MinerTypes.LUX_OS: {
None: LUXMiner,
"ANTMINER S9": LUXMinerS9,
},
}
@@ -420,6 +426,7 @@ class MinerFactory:
MinerTypes.BRAIINS_OS: self.get_miner_model_braiins_os,
MinerTypes.VNISH: self.get_miner_model_vnish,
MinerTypes.HIVEON: self.get_miner_model_hiveon,
MinerTypes.LUX_OS: self.get_miner_model_luxos,
}
fn = miner_model_fns.get(miner_type)
@@ -490,14 +497,15 @@ class MinerFactory:
return MinerTypes.INNOSILICON
async def _get_miner_socket(self, ip: str):
commands = ["devdetails", "version"]
commands = ["version", "devdetails"]
tasks = [asyncio.create_task(self._socket_ping(ip, cmd)) for cmd in commands]
data = await concurrent_get_first_result(
tasks, lambda x: x is not None and self._parse_socket_type(x) is not None
)
if data is not None:
return self._parse_socket_type(data)
d = self._parse_socket_type(data)
return d
@staticmethod
async def _socket_ping(ip: str, cmd: str) -> Optional[str]:
@@ -555,7 +563,9 @@ class MinerFactory:
return MinerTypes.VNISH
if "HIVEON" in upper_data:
return MinerTypes.HIVEON
if "ANTMINER" in upper_data:
if "LUXMINER" in upper_data:
return MinerTypes.LUX_OS
if "ANTMINER" in upper_data and not "DEVDETAILS" in upper_data:
return MinerTypes.ANTMINER
if "INTCHAINS_QOMO" in upper_data:
return MinerTypes.GOLDSHELL
@@ -816,5 +826,17 @@ class MinerFactory:
except (TypeError, LookupError):
pass
async def get_miner_model_luxos(self, ip: str):
sock_json_data = await self.send_api_command(ip, "version")
try:
miner_model = sock_json_data["VERSION"][0]["Type"]
if " (" in miner_model:
split_miner_model = miner_model.split(" (")
miner_model = split_miner_model[0]
return miner_model
except (TypeError, LookupError):
pass
miner_factory = MinerFactory()

View File

@@ -148,6 +148,9 @@ class UnknownMiner(BaseMiner):
async def get_nominal_hashrate(self) -> Optional[float]:
return None
async def is_mining(self, *args, **kwargs) -> Optional[bool]:
return None
async def get_data(
self, allow_warning: bool = False, data_to_get: list = None
) -> MinerData: