Compare commits

..

12 Commits

Author SHA1 Message Date
Upstream Data
4a5d793cd6 version: bump version number 2024-12-02 15:14:36 -07:00
Upstream Data
1894ff1aea feature: add hiveon web API 2024-12-02 15:14:23 -07:00
Upstream Data
3ab9294000 version: bump version number 2024-12-02 15:12:19 -07:00
Upstream Data
5e0641634b bug: fix MRO for hiveon 2024-12-02 15:11:59 -07:00
Upstream Data
a1975bc9b8 version: bump version number 2024-12-02 13:43:09 -07:00
kdmukai
6a265f03f7 cleanup debugging 2024-12-02 13:42:42 -07:00
Upstream Data
c3658f028f version: bump version number 2024-12-02 10:34:08 -07:00
Upstream Data
ba3c653a29 bug: fix chains: [] which doesnt work with vnish 2024-12-02 10:33:53 -07:00
Upstream Data
61fbc132ed version: bump version number 2024-12-02 10:31:15 -07:00
Upstream Data
3f9f232990 bug: fix pool URL for vnish parser 2024-12-02 10:30:57 -07:00
Upstream Data
29c2398846 version: bump version number 2024-12-02 10:16:23 -07:00
Upstream Data
ecc161820d bug: fix vnish overclock setting 2024-12-02 10:16:04 -07:00
6 changed files with 229 additions and 6 deletions

View File

@@ -382,9 +382,15 @@ class MiningModeManual(MinerConfigValue):
return {"miner-mode": 0}
def as_vnish(self) -> dict:
chains = [b.as_vnish() for b in self.boards.values() if b.freq != 0]
return {
"chains": [b.as_vnish() for b in self.boards.values() if b.freq != 0],
"globals": {"freq": int(self.global_freq), "volt": int(self.global_volt)},
"overclock": {
"chains": chains if chains != [] else None,
"globals": {
"freq": int(self.global_freq),
"volt": int(self.global_volt),
},
}
}
@classmethod

View File

@@ -201,7 +201,7 @@ class Pool(MinerConfigValue):
@classmethod
def from_vnish(cls, web_pool: dict) -> "Pool":
return cls(
url=web_pool["url"],
url="stratum+tcp://" + web_pool["url"],
user=web_pool["user"],
password=web_pool["pass"],
)

View File

@@ -19,6 +19,7 @@ from pyasic import APIError
from pyasic.miners.backends import BMMiner
from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand
from pyasic.miners.device.firmware import HiveonFirmware
from pyasic.web.hiveon import HiveonWebAPI
HIVEON_DATA_LOC = DataLocations(
**{
@@ -62,9 +63,12 @@ HIVEON_DATA_LOC = DataLocations(
)
class Hiveon(BMMiner, HiveonFirmware):
class Hiveon(HiveonFirmware, BMMiner):
data_locations = HIVEON_DATA_LOC
web: HiveonWebAPI
_web_cls = HiveonWebAPI
async def _get_wattage(self, rpc_stats: dict = None) -> Optional[int]:
if not rpc_stats:
try:

View File

@@ -631,7 +631,6 @@ class MinerFactory:
@staticmethod
def _parse_web_type(web_text: str, web_resp: httpx.Response) -> MinerTypes | None:
print(web_resp.headers)
if web_resp.status_code == 401 and 'realm="antMiner' in web_resp.headers.get(
"www-authenticate", ""
):

214
pyasic/web/hiveon.py Normal file
View File

@@ -0,0 +1,214 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import json
from pathlib import Path
from typing import Any
import aiofiles
import httpx
from pyasic import settings
from pyasic.web.base import BaseWebAPI
class HiveonWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
"""Initialize the old Antminer API client with a specific IP address.
Args:
ip (str): IP address of the Antminer device.
"""
super().__init__(ip)
self.username = "root"
self.pwd = settings.get("default_hive_web_password", "root")
async def send_command(
self,
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
privileged: bool = False,
**parameters: Any,
) -> dict:
"""Send a command to the Antminer device using HTTP digest authentication.
Args:
command (str | bytes): The CGI command to send.
ignore_errors (bool): If True, ignore any HTTP errors.
allow_warning (bool): If True, proceed with warnings.
privileged (bool): If set to True, requires elevated privileges.
**parameters: Arbitrary keyword arguments to be sent as parameters in the request.
Returns:
dict: The JSON response from the device or an empty dictionary if an error occurs.
"""
url = f"http://{self.ip}:{self.port}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd)
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
if parameters:
data = await client.post(
url,
data=parameters,
auth=auth,
timeout=settings.get("api_function_timeout", 3),
)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
async def multicommand(
self, *commands: str, ignore_errors: bool = False, allow_warning: bool = True
) -> dict:
"""Execute multiple commands simultaneously.
Args:
*commands (str): Multiple command strings to be executed.
ignore_errors (bool): If True, ignore any HTTP errors.
allow_warning (bool): If True, proceed with warnings.
Returns:
dict: A dictionary containing the results of all commands executed.
"""
data = {k: None for k in commands}
auth = httpx.DigestAuth(self.username, self.pwd)
async with httpx.AsyncClient(transport=settings.transport()) as client:
for command in commands:
try:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
ret = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if ret.status_code == 200:
try:
json_data = ret.json()
data[command] = json_data
except json.decoder.JSONDecodeError:
pass
return data
async def get_system_info(self) -> dict:
"""Retrieve system information from the miner.
Returns:
dict: A dictionary containing system information of the miner.
"""
return await self.send_command("get_system_info")
async def get_network_info(self) -> dict:
"""Retrieve system information from the miner.
Returns:
dict: A dictionary containing system information of the miner.
"""
return await self.send_command("get_network_info")
async def blink(self, blink: bool) -> dict:
"""Control the blinking of the LED on the miner device.
Args:
blink (bool): True to start blinking, False to stop.
Returns:
dict: A dictionary response from the device after the command execution.
"""
if blink:
return await self.send_command("blink", action="startBlink")
return await self.send_command("blink", action="stopBlink")
async def reboot(self) -> dict:
"""Reboot the miner device.
Returns:
dict: A dictionary response from the device confirming the reboot command.
"""
return await self.send_command("reboot")
async def get_blink_status(self) -> dict:
"""Check the status of the LED blinking on the miner.
Returns:
dict: A dictionary indicating whether the LED is currently blinking.
"""
return await self.send_command("blink", action="onPageLoaded")
async def get_miner_conf(self) -> dict:
"""Retrieve the miner configuration from the Antminer device.
Returns:
dict: A dictionary containing the current configuration of the miner.
"""
return await self.send_command("get_miner_conf")
async def set_miner_conf(self, conf: dict) -> dict:
"""Set the configuration for the miner.
Args:
conf (dict): A dictionary of configuration settings to apply to the miner.
Returns:
dict: A dictionary response from the device after setting the configuration.
"""
return await self.send_command("set_miner_conf", **conf)
async def stats(self) -> dict:
"""Retrieve detailed statistical data of the mining operation.
Returns:
dict: Detailed statistics of the miner's operation.
"""
return await self.send_command("miner_stats")
async def summary(self) -> dict:
"""Get a summary of the miner's status and performance.
Returns:
dict: A summary of the miner's current operational status.
"""
return await self.send_command("miner_summary")
async def pools(self) -> dict:
"""Retrieve current pool information associated with the miner.
Returns:
dict: Information about the mining pools configured in the miner.
"""
return await self.send_command("miner_pools")
async def update_firmware(self, file: Path, keep_settings: bool = True) -> dict:
"""Perform a system update by uploading a firmware file and sending a command to initiate the update."""
async with aiofiles.open(file, "rb") as firmware:
file_content = await firmware.read()
parameters = {
"file": (file.name, file_content, "application/octet-stream"),
"filename": file.name,
"keep_settings": keep_settings,
}
return await self.send_command(command="upgrade", **parameters)

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyasic"
version = "0.64.7"
version = "0.64.13"
description = "A simplified and standardized interface for Bitcoin ASICs."
authors = ["UpstreamData <brett@upstreamdata.ca>"]
repository = "https://github.com/UpstreamData/pyasic"