Compare commits

..

10 Commits

Author SHA1 Message Date
Upstream Data
4a5d793cd6 version: bump version number 2024-12-02 15:14:36 -07:00
Upstream Data
1894ff1aea feature: add hiveon web API 2024-12-02 15:14:23 -07:00
Upstream Data
3ab9294000 version: bump version number 2024-12-02 15:12:19 -07:00
Upstream Data
5e0641634b bug: fix MRO for hiveon 2024-12-02 15:11:59 -07:00
Upstream Data
a1975bc9b8 version: bump version number 2024-12-02 13:43:09 -07:00
kdmukai
6a265f03f7 cleanup debugging 2024-12-02 13:42:42 -07:00
Upstream Data
c3658f028f version: bump version number 2024-12-02 10:34:08 -07:00
Upstream Data
ba3c653a29 bug: fix chains: [] which doesnt work with vnish 2024-12-02 10:33:53 -07:00
Upstream Data
61fbc132ed version: bump version number 2024-12-02 10:31:15 -07:00
Upstream Data
3f9f232990 bug: fix pool URL for vnish parser 2024-12-02 10:30:57 -07:00
6 changed files with 223 additions and 5 deletions

View File

@@ -382,9 +382,10 @@ class MiningModeManual(MinerConfigValue):
return {"miner-mode": 0} return {"miner-mode": 0}
def as_vnish(self) -> dict: def as_vnish(self) -> dict:
chains = [b.as_vnish() for b in self.boards.values() if b.freq != 0]
return { return {
"overclock": { "overclock": {
"chains": [b.as_vnish() for b in self.boards.values() if b.freq != 0], "chains": chains if chains != [] else None,
"globals": { "globals": {
"freq": int(self.global_freq), "freq": int(self.global_freq),
"volt": int(self.global_volt), "volt": int(self.global_volt),

View File

@@ -201,7 +201,7 @@ class Pool(MinerConfigValue):
@classmethod @classmethod
def from_vnish(cls, web_pool: dict) -> "Pool": def from_vnish(cls, web_pool: dict) -> "Pool":
return cls( return cls(
url=web_pool["url"], url="stratum+tcp://" + web_pool["url"],
user=web_pool["user"], user=web_pool["user"],
password=web_pool["pass"], password=web_pool["pass"],
) )

View File

@@ -19,6 +19,7 @@ from pyasic import APIError
from pyasic.miners.backends import BMMiner from pyasic.miners.backends import BMMiner
from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand from pyasic.miners.data import DataFunction, DataLocations, DataOptions, RPCAPICommand
from pyasic.miners.device.firmware import HiveonFirmware from pyasic.miners.device.firmware import HiveonFirmware
from pyasic.web.hiveon import HiveonWebAPI
HIVEON_DATA_LOC = DataLocations( HIVEON_DATA_LOC = DataLocations(
**{ **{
@@ -62,9 +63,12 @@ HIVEON_DATA_LOC = DataLocations(
) )
class Hiveon(BMMiner, HiveonFirmware): class Hiveon(HiveonFirmware, BMMiner):
data_locations = HIVEON_DATA_LOC data_locations = HIVEON_DATA_LOC
web: HiveonWebAPI
_web_cls = HiveonWebAPI
async def _get_wattage(self, rpc_stats: dict = None) -> Optional[int]: async def _get_wattage(self, rpc_stats: dict = None) -> Optional[int]:
if not rpc_stats: if not rpc_stats:
try: try:

View File

@@ -631,7 +631,6 @@ class MinerFactory:
@staticmethod @staticmethod
def _parse_web_type(web_text: str, web_resp: httpx.Response) -> MinerTypes | None: def _parse_web_type(web_text: str, web_resp: httpx.Response) -> MinerTypes | None:
print(web_resp.headers)
if web_resp.status_code == 401 and 'realm="antMiner' in web_resp.headers.get( if web_resp.status_code == 401 and 'realm="antMiner' in web_resp.headers.get(
"www-authenticate", "" "www-authenticate", ""
): ):

214
pyasic/web/hiveon.py Normal file
View File

@@ -0,0 +1,214 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import json
from pathlib import Path
from typing import Any
import aiofiles
import httpx
from pyasic import settings
from pyasic.web.base import BaseWebAPI
class HiveonWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
"""Initialize the old Antminer API client with a specific IP address.
Args:
ip (str): IP address of the Antminer device.
"""
super().__init__(ip)
self.username = "root"
self.pwd = settings.get("default_hive_web_password", "root")
async def send_command(
self,
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
privileged: bool = False,
**parameters: Any,
) -> dict:
"""Send a command to the Antminer device using HTTP digest authentication.
Args:
command (str | bytes): The CGI command to send.
ignore_errors (bool): If True, ignore any HTTP errors.
allow_warning (bool): If True, proceed with warnings.
privileged (bool): If set to True, requires elevated privileges.
**parameters: Arbitrary keyword arguments to be sent as parameters in the request.
Returns:
dict: The JSON response from the device or an empty dictionary if an error occurs.
"""
url = f"http://{self.ip}:{self.port}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd)
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
if parameters:
data = await client.post(
url,
data=parameters,
auth=auth,
timeout=settings.get("api_function_timeout", 3),
)
else:
data = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
async def multicommand(
self, *commands: str, ignore_errors: bool = False, allow_warning: bool = True
) -> dict:
"""Execute multiple commands simultaneously.
Args:
*commands (str): Multiple command strings to be executed.
ignore_errors (bool): If True, ignore any HTTP errors.
allow_warning (bool): If True, proceed with warnings.
Returns:
dict: A dictionary containing the results of all commands executed.
"""
data = {k: None for k in commands}
auth = httpx.DigestAuth(self.username, self.pwd)
async with httpx.AsyncClient(transport=settings.transport()) as client:
for command in commands:
try:
url = f"http://{self.ip}/cgi-bin/{command}.cgi"
ret = await client.get(url, auth=auth)
except httpx.HTTPError:
pass
else:
if ret.status_code == 200:
try:
json_data = ret.json()
data[command] = json_data
except json.decoder.JSONDecodeError:
pass
return data
async def get_system_info(self) -> dict:
"""Retrieve system information from the miner.
Returns:
dict: A dictionary containing system information of the miner.
"""
return await self.send_command("get_system_info")
async def get_network_info(self) -> dict:
"""Retrieve system information from the miner.
Returns:
dict: A dictionary containing system information of the miner.
"""
return await self.send_command("get_network_info")
async def blink(self, blink: bool) -> dict:
"""Control the blinking of the LED on the miner device.
Args:
blink (bool): True to start blinking, False to stop.
Returns:
dict: A dictionary response from the device after the command execution.
"""
if blink:
return await self.send_command("blink", action="startBlink")
return await self.send_command("blink", action="stopBlink")
async def reboot(self) -> dict:
"""Reboot the miner device.
Returns:
dict: A dictionary response from the device confirming the reboot command.
"""
return await self.send_command("reboot")
async def get_blink_status(self) -> dict:
"""Check the status of the LED blinking on the miner.
Returns:
dict: A dictionary indicating whether the LED is currently blinking.
"""
return await self.send_command("blink", action="onPageLoaded")
async def get_miner_conf(self) -> dict:
"""Retrieve the miner configuration from the Antminer device.
Returns:
dict: A dictionary containing the current configuration of the miner.
"""
return await self.send_command("get_miner_conf")
async def set_miner_conf(self, conf: dict) -> dict:
"""Set the configuration for the miner.
Args:
conf (dict): A dictionary of configuration settings to apply to the miner.
Returns:
dict: A dictionary response from the device after setting the configuration.
"""
return await self.send_command("set_miner_conf", **conf)
async def stats(self) -> dict:
"""Retrieve detailed statistical data of the mining operation.
Returns:
dict: Detailed statistics of the miner's operation.
"""
return await self.send_command("miner_stats")
async def summary(self) -> dict:
"""Get a summary of the miner's status and performance.
Returns:
dict: A summary of the miner's current operational status.
"""
return await self.send_command("miner_summary")
async def pools(self) -> dict:
"""Retrieve current pool information associated with the miner.
Returns:
dict: Information about the mining pools configured in the miner.
"""
return await self.send_command("miner_pools")
async def update_firmware(self, file: Path, keep_settings: bool = True) -> dict:
"""Perform a system update by uploading a firmware file and sending a command to initiate the update."""
async with aiofiles.open(file, "rb") as firmware:
file_content = await firmware.read()
parameters = {
"file": (file.name, file_content, "application/octet-stream"),
"filename": file.name,
"keep_settings": keep_settings,
}
return await self.send_command(command="upgrade", **parameters)

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "pyasic" name = "pyasic"
version = "0.64.8" version = "0.64.13"
description = "A simplified and standardized interface for Bitcoin ASICs." description = "A simplified and standardized interface for Bitcoin ASICs."
authors = ["UpstreamData <brett@upstreamdata.ca>"] authors = ["UpstreamData <brett@upstreamdata.ca>"]
repository = "https://github.com/UpstreamData/pyasic" repository = "https://github.com/UpstreamData/pyasic"