refactor: make web handlers much more consistent across types, remove graphql, and make luci and grpc the dedicated web apis for BOSer and BOSMiner respectively.

This commit is contained in:
UpstreamData
2024-01-25 13:50:04 -07:00
parent b328a27f04
commit aa1d7c1b6f
13 changed files with 356 additions and 581 deletions

View File

@@ -29,7 +29,6 @@ from pyasic.miners.base import (
DataFunction,
DataLocations,
DataOptions,
GRPCCommand,
RPCAPICommand,
WebAPICommand,
)
@@ -275,7 +274,7 @@ class BOSMiner(BaseMiner):
async def _get_mac(self, web_net_conf: Union[dict, list] = None) -> Optional[str]:
if web_net_conf is None:
try:
web_net_conf = await self.web.luci.get_net_conf()
web_net_conf = await self.web.get_net_conf()
except APIError:
pass
@@ -314,7 +313,7 @@ class BOSMiner(BaseMiner):
async def _get_fw_ver(self, web_bos_info: dict = None) -> Optional[str]:
if web_bos_info is None:
try:
web_bos_info = await self.web.luci.get_bos_info()
web_bos_info = await self.web.get_bos_info()
except APIError:
return None
@@ -571,19 +570,19 @@ BOSER_DATA_LOC = DataLocations(
**{
str(DataOptions.MAC): DataFunction(
"_get_mac",
[GRPCCommand("grpc_miner_details", "get_miner_details")],
[WebAPICommand("grpc_miner_details", "get_miner_details")],
),
str(DataOptions.API_VERSION): DataFunction(
"_get_api_ver",
[GRPCCommand("api_version", "get_api_version")],
[RPCAPICommand("api_version", "version")],
),
str(DataOptions.FW_VERSION): DataFunction(
"_get_fw_ver",
[GRPCCommand("grpc_miner_details", "get_miner_details")],
[WebAPICommand("grpc_miner_details", "get_miner_details")],
),
str(DataOptions.HOSTNAME): DataFunction(
"_get_hostname",
[GRPCCommand("grpc_miner_details", "get_miner_details")],
[WebAPICommand("grpc_miner_details", "get_miner_details")],
),
str(DataOptions.HASHRATE): DataFunction(
"_get_hashrate",
@@ -591,27 +590,27 @@ BOSER_DATA_LOC = DataLocations(
),
str(DataOptions.EXPECTED_HASHRATE): DataFunction(
"_get_expected_hashrate",
[GRPCCommand("grpc_miner_details", "get_miner_details")],
[WebAPICommand("grpc_miner_details", "get_miner_details")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[GRPCCommand("grpc_hashboards", "get_hashboards")],
[WebAPICommand("grpc_hashboards", "get_hashboards")],
),
str(DataOptions.WATTAGE): DataFunction(
"_get_wattage",
[GRPCCommand("grpc_miner_stats", "get_miner_stats")],
[WebAPICommand("grpc_miner_stats", "get_miner_stats")],
),
str(DataOptions.WATTAGE_LIMIT): DataFunction(
"_get_wattage_limit",
[
GRPCCommand(
WebAPICommand(
"grpc_active_performance_mode", "get_active_performance_mode"
)
],
),
str(DataOptions.FANS): DataFunction(
"_get_fans",
[GRPCCommand("grpc_cooling_state", "get_cooling_state")],
[WebAPICommand("grpc_cooling_state", "get_cooling_state")],
),
str(DataOptions.ERRORS): DataFunction(
"_get_errors",
@@ -619,7 +618,7 @@ BOSER_DATA_LOC = DataLocations(
),
str(DataOptions.FAULT_LIGHT): DataFunction(
"_get_fault_light",
[GRPCCommand("grpc_locate_device_status", "get_locate_device_status")],
[WebAPICommand("grpc_locate_device_status", "get_locate_device_status")],
),
str(DataOptions.IS_MINING): DataFunction(
"_is_mining",
@@ -647,13 +646,13 @@ class BOSer(BaseMiner):
supports_shutdown = True
async def fault_light_on(self) -> bool:
resp = await self.web.grpc.set_locate_device_status(True)
resp = await self.web.set_locate_device_status(True)
if resp.get("enabled", False):
return True
return False
async def fault_light_off(self) -> bool:
resp = await self.web.grpc.set_locate_device_status(False)
resp = await self.web.set_locate_device_status(False)
if resp == {}:
return True
return False
@@ -662,37 +661,37 @@ class BOSer(BaseMiner):
return await self.restart_boser()
async def restart_boser(self) -> bool:
await self.web.grpc.restart()
await self.web.restart()
return True
async def stop_mining(self) -> bool:
try:
await self.web.grpc.pause_mining()
await self.web.pause_mining()
except APIError:
return False
return True
async def resume_mining(self) -> bool:
try:
await self.web.grpc.resume_mining()
await self.web.resume_mining()
except APIError:
return False
return True
async def reboot(self) -> bool:
ret = await self.web.grpc.reboot()
ret = await self.web.reboot()
if ret == {}:
return True
return False
async def get_config(self) -> MinerConfig:
grpc_conf = await self.web.grpc.get_miner_configuration()
grpc_conf = await self.web.get_miner_configuration()
return MinerConfig.from_boser(grpc_conf)
async def set_power_limit(self, wattage: int) -> bool:
try:
result = await self.web.grpc.set_power_target(wattage)
result = await self.web.set_power_target(wattage)
except APIError:
return False
@@ -710,7 +709,7 @@ class BOSer(BaseMiner):
async def _get_mac(self, grpc_miner_details: dict = None) -> Optional[str]:
if grpc_miner_details is None:
try:
grpc_miner_details = await self.web.grpc.get_miner_details()
grpc_miner_details = await self.web.get_miner_details()
except APIError:
pass
@@ -740,7 +739,7 @@ class BOSer(BaseMiner):
async def _get_fw_ver(self, grpc_miner_details: dict = None) -> Optional[str]:
if grpc_miner_details is None:
try:
grpc_miner_details = await self.web.grpc.get_miner_details()
grpc_miner_details = await self.web.get_miner_details()
except APIError:
pass
@@ -763,7 +762,7 @@ class BOSer(BaseMiner):
async def _get_hostname(self, grpc_miner_details: dict = None) -> Optional[str]:
if grpc_miner_details is None:
try:
grpc_miner_details = await self.web.grpc.get_miner_details()
grpc_miner_details = await self.web.get_miner_details()
except APIError:
pass
@@ -791,7 +790,7 @@ class BOSer(BaseMiner):
) -> Optional[float]:
if grpc_miner_details is None:
try:
grpc_miner_details = await self.web.grpc.get_miner_details()
grpc_miner_details = await self.web.get_miner_details()
except APIError:
pass
@@ -809,7 +808,7 @@ class BOSer(BaseMiner):
if grpc_hashboards is None:
try:
grpc_hashboards = await self.web.grpc.get_hashboards()
grpc_hashboards = await self.web.get_hashboards()
except APIError:
pass
@@ -840,7 +839,7 @@ class BOSer(BaseMiner):
async def _get_wattage(self, grpc_miner_stats: dict = None) -> Optional[int]:
if grpc_miner_stats is None:
try:
grpc_miner_stats = self.web.grpc.get_miner_stats()
grpc_miner_stats = self.web.get_miner_stats()
except APIError:
pass
@@ -855,9 +854,7 @@ class BOSer(BaseMiner):
) -> Optional[int]:
if grpc_active_performance_mode is None:
try:
grpc_active_performance_mode = (
self.web.grpc.get_active_performance_mode()
)
grpc_active_performance_mode = self.web.get_active_performance_mode()
except APIError:
pass
@@ -872,7 +869,7 @@ class BOSer(BaseMiner):
async def _get_fans(self, grpc_cooling_state: dict = None) -> List[Fan]:
if grpc_cooling_state is None:
try:
grpc_cooling_state = self.web.grpc.get_cooling_state()
grpc_cooling_state = self.web.get_cooling_state()
except APIError:
pass
@@ -922,9 +919,7 @@ class BOSer(BaseMiner):
if grpc_locate_device_status is None:
try:
grpc_locate_device_status = (
await self.web.grpc.get_locate_device_status()
)
grpc_locate_device_status = await self.web.get_locate_device_status()
except APIError:
pass

View File

@@ -76,18 +76,12 @@ class GRPCCommand(WebAPICommand):
cmd: str
@dataclass
class GraphQLCommand(WebAPICommand):
name: str
cmd: dict
@dataclass
class DataFunction:
cmd: str
kwargs: List[
Union[RPCAPICommand, WebAPICommand, GRPCCommand, GraphQLCommand]
] = field(default_factory=list)
kwargs: List[Union[RPCAPICommand, WebAPICommand, GRPCCommand]] = field(
default_factory=list
)
def __call__(self, *args, **kwargs):
return self

View File

@@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import warnings
from abc import ABC, abstractmethod
from typing import Union
from typing import Any
from pyasic.errors import APIWarning
@@ -23,11 +25,13 @@ from pyasic.errors import APIWarning
class BaseWebAPI(ABC):
def __init__(self, ip: str) -> None:
# ip address of the miner
self.ip = ip # ipaddress.ip_address(ip)
self.username = "root"
self.ip = ip
self.username = None
self.pwd = None
self.port = 80
self.token = None
def __new__(cls, *args, **kwargs):
if cls is BaseWebAPI:
raise TypeError(f"Only children of '{cls.__name__}' may be instantiated")
@@ -39,10 +43,11 @@ class BaseWebAPI(ABC):
@abstractmethod
async def send_command(
self,
command: Union[str, bytes],
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
privileged: bool = False,
**parameters: Any,
) -> dict:
pass

View File

@@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import json
from typing import Union
from typing import Any
import httpx
@@ -26,14 +28,16 @@ from pyasic.web import BaseWebAPI
class AntminerModernWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.username = "root"
self.pwd = settings.get("default_antminer_web_password", "root")
async def send_command(
self,
command: Union[str, bytes],
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
privileged: bool = False,
**parameters: Any,
) -> dict:
url = f"http://{self.ip}:{self.port}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd)
@@ -44,9 +48,9 @@ class AntminerModernWebAPI(BaseWebAPI):
if parameters:
data = await client.post(
url,
data=json.dumps(parameters),
auth=auth,
timeout=settings.get("api_function_timeout", 3), # noqa
timeout=settings.get("api_function_timeout", 3),
json=parameters,
)
else:
data = await client.get(url, auth=auth)
@@ -76,7 +80,9 @@ class AntminerModernWebAPI(BaseWebAPI):
data["multicommand"] = True
return data
async def _handle_multicommand(self, client: httpx.AsyncClient, command: str):
async def _handle_multicommand(
self, client: httpx.AsyncClient, command: str
) -> dict:
auth = httpx.DigestAuth(self.username, self.pwd)
try:
@@ -142,14 +148,16 @@ class AntminerModernWebAPI(BaseWebAPI):
class AntminerOldWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.username = "root"
self.pwd = settings.get("default_antminer_web_password", "root")
async def send_command(
self,
command: Union[str, bytes],
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
privileged: bool = False,
**parameters: Any,
) -> dict:
url = f"http://{self.ip}:{self.port}/cgi-bin/{command}.cgi"
auth = httpx.DigestAuth(self.username, self.pwd)

View File

@@ -13,10 +13,12 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import json
import warnings
from typing import Any, List, Union
from typing import Any
import httpx
@@ -31,9 +33,9 @@ class FluxWebAPI(BaseWebAPI):
self.username = "admin"
self.pwd = settings.get("default_auradine_web_password", "admin")
self.port = 8080
self.jwt = None
self.token = None
async def auth(self):
async def auth(self) -> str | None:
async with httpx.AsyncClient(transport=settings.transport()) as client:
try:
auth = await client.post(
@@ -49,20 +51,24 @@ class FluxWebAPI(BaseWebAPI):
else:
json_auth = auth.json()
try:
self.jwt = json_auth["Token"][0]["Token"]
self.token = json_auth["Token"][0]["Token"]
except LookupError:
return None
return self.jwt
return self.token
async def send_command(
self,
command: Union[str, bytes],
post: bool = False,
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
privileged: bool = False,
**parameters: Any,
) -> dict:
if self.jwt is None:
post = privileged or not parameters == {}
if not parameters == {}:
parameters["command"] = command
if self.token is None:
await self.auth()
async with httpx.AsyncClient(transport=settings.transport()) as client:
for i in range(settings.get("get_data_retries", 1)):
@@ -70,20 +76,14 @@ class FluxWebAPI(BaseWebAPI):
if post:
response = await client.post(
f"http://{self.ip}:{self.port}/{command}",
headers={"Token": self.jwt},
headers={"Token": self.token},
timeout=settings.get("api_function_timeout", 5),
)
elif parameters:
response = await client.post(
f"http://{self.ip}:{self.port}/{command}",
headers={"Token": self.jwt},
timeout=settings.get("api_function_timeout", 5),
json={"command": command, **parameters},
json=parameters,
)
else:
response = await client.get(
f"http://{self.ip}:{self.port}/{command}",
headers={"Token": self.jwt},
headers={"Token": self.token},
timeout=settings.get("api_function_timeout", 5),
)
json_data = response.json()
@@ -95,9 +95,7 @@ class FluxWebAPI(BaseWebAPI):
await self.auth()
continue
return json_data
except httpx.HTTPError:
pass
except json.JSONDecodeError:
except (httpx.HTTPError, json.JSONDecodeError):
pass
async def multicommand(
@@ -166,97 +164,97 @@ class FluxWebAPI(BaseWebAPI):
return False, data["STATUS"][0]["Msg"]
return True, None
async def factory_reset(self):
return await self.send_command("factory-reset", post=True)
async def factory_reset(self) -> dict:
return await self.send_command("factory-reset", privileged=True)
async def get_fan(self):
async def get_fan(self) -> dict:
return await self.send_command("fan")
async def set_fan(self, fan: int, speed_pct: int):
async def set_fan(self, fan: int, speed_pct: int) -> dict:
return await self.send_command("fan", index=fan, percentage=speed_pct)
async def firmware_upgrade(self, url: str = None, version: str = "latest"):
async def firmware_upgrade(self, url: str = None, version: str = "latest") -> dict:
if url is not None:
return await self.send_command("firmware-upgrade", url=url)
return await self.send_command("firmware-upgrade", version=version)
async def get_frequency(self):
async def get_frequency(self) -> dict:
return await self.send_command("frequency")
async def set_frequency(self, board: int, frequency: float):
async def set_frequency(self, board: int, frequency: float) -> dict:
return await self.send_command("frequency", board=board, frequency=frequency)
async def ipreport(self):
async def ipreport(self) -> dict:
return await self.send_command("ipreport")
async def get_led(self):
async def get_led(self) -> dict:
return await self.send_command("led")
async def set_led(self, code: int):
async def set_led(self, code: int) -> dict:
return await self.send_command("led", code=code)
async def set_led_custom(self, code: int, led_1: int, led_2: int, msg: str):
async def set_led_custom(self, code: int, led_1: int, led_2: int, msg: str) -> dict:
return await self.send_command(
"led", code=code, led1=led_1, led2=led_2, msg=msg
)
async def get_mode(self):
async def get_mode(self) -> dict:
return await self.send_command("mode")
async def set_mode(self, **kwargs):
async def set_mode(self, **kwargs) -> dict:
return await self.send_command("mode", **kwargs)
async def get_network(self):
async def get_network(self) -> dict:
return await self.send_command("network")
async def set_network(self, **kwargs):
async def set_network(self, **kwargs) -> dict:
return await self.send_command("network", **kwargs)
async def password(self, password: str):
async def password(self, password: str) -> dict:
res = await self.send_command(
"password", user=self.username, old=self.pwd, new=password
)
self.pwd = password
return res
async def get_psu(self):
async def get_psu(self) -> dict:
return await self.send_command("psu")
async def set_psu(self, voltage: float):
async def set_psu(self, voltage: float) -> dict:
return await self.send_command("psu", voltage=voltage)
async def get_register(self):
async def get_register(self) -> dict:
return await self.send_command("register")
async def set_register(self, company: str):
async def set_register(self, company: str) -> dict:
return await self.send_command("register", parameter=company)
async def reboot(self):
return await self.send_command("restart", post=True)
async def reboot(self) -> dict:
return await self.send_command("restart", privileged=True)
async def restart_gcminer(self):
async def restart_gcminer(self) -> dict:
return await self.send_command("restart", parameter="gcminer")
async def restart_api_server(self):
async def restart_api_server(self) -> dict:
return await self.send_command("restart", parameter="api-server")
async def temperature(self):
async def temperature(self) -> dict:
return await self.send_command("temperature")
async def timedate(self, ntp: str, timezone: str):
async def timedate(self, ntp: str, timezone: str) -> dict:
return await self.send_command("timedate", ntp=ntp, timezone=timezone)
async def token(self):
async def token(self) -> dict:
return await self.send_command("token", user=self.username, password=self.pwd)
async def update_pools(self, pools: List[dict]):
async def update_pools(self, pools: list[dict]) -> dict:
return await self.send_command("updatepools", pools=pools)
async def voltage(self):
async def voltage(self) -> dict:
return await self.send_command("voltage")
async def get_ztp(self):
async def get_ztp(self) -> dict:
return await self.send_command("ztp")
async def set_ztp(self, enable: bool):
async def set_ztp(self, enable: bool) -> dict:
return await self.send_command("ztp", parameter="on" if enable else "off")

View File

@@ -1,149 +1,2 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import asyncio
from typing import Union
from pyasic import settings
from pyasic.errors import APIError
from pyasic.web import BaseWebAPI
from .graphql import BOSerGraphQLAPI
from .grpc import BOSerGRPCAPI
from .luci import BOSMinerLuCIAPI
class BOSMinerWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
self.luci = BOSMinerLuCIAPI(
ip, settings.get("default_bosminer_password", "root")
)
self._pwd = settings.get("default_bosminer_password", "root")
self._port = 80
super().__init__(ip)
@property
def pwd(self):
return self._pwd
@pwd.setter
def pwd(self, other: str):
self._pwd = other
self.luci.pwd = other
@property
def port(self):
return self._port
@port.setter
def port(self, other: str):
self._port = other
self.luci.port = other
async def send_command(
self,
command: Union[str, dict],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
return await self.luci.send_command(command)
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
return await self.luci.multicommand(*commands)
class BOSerWebAPI(BOSMinerWebAPI):
def __init__(self, ip: str) -> None:
self.gql = BOSerGraphQLAPI(
ip, settings.get("default_bosminer_password", "root")
)
self.grpc = BOSerGRPCAPI(ip, settings.get("default_bosminer_password", "root"))
self._port = 80
super().__init__(ip)
@property
def pwd(self):
return self._pwd
@pwd.setter
def pwd(self, other: str):
self._pwd = other
self.luci.pwd = other
self.gql.pwd = other
self.grpc.pwd = other
@property
def port(self):
return self._port
@port.setter
def port(self, other: str):
self._port = other
self.luci.port = other
self.gql.port = other
async def send_command(
self,
command: Union[str, dict],
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
) -> dict:
command_type = self.select_command_type(command)
if command_type == "gql":
return await self.gql.send_command(command)
elif command_type == "grpc":
try:
return await getattr(self.grpc, command.replace("grpc_", ""))()
except AttributeError:
raise APIError(f"No gRPC command found for command: {command}")
elif command_type == "luci":
return await self.luci.send_command(command)
@staticmethod
def select_command_type(command: Union[str, dict]) -> str:
if isinstance(command, dict):
return "gql"
else:
return "grpc"
async def multicommand(
self, *commands: Union[dict, str], allow_warning: bool = True
) -> dict:
cmd_types = {"grpc": [], "gql": []}
for cmd in commands:
cmd_types[self.select_command_type(cmd)].append(cmd)
async def no_op():
return {}
if len(cmd_types["grpc"]) > 0:
grpc_data_t = asyncio.create_task(
self.grpc.multicommand(*cmd_types["grpc"])
)
else:
grpc_data_t = asyncio.create_task(no_op())
if len(cmd_types["gql"]) > 0:
gql_data_t = asyncio.create_task(self.gql.multicommand(*cmd_types["gql"]))
else:
gql_data_t = asyncio.create_task(no_op())
await asyncio.gather(grpc_data_t, gql_data_t)
data = dict(**grpc_data_t.result(), **gql_data_t.result())
return data
from .boser import BOSerWebAPI
from .bosminer import BOSMinerWebAPI

View File

@@ -13,15 +13,19 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
from datetime import timedelta
from typing import Any
from betterproto import Message
from grpclib import GRPCError, Status
from grpclib.client import Channel
from pyasic import settings
from pyasic.errors import APIError
from pyasic.web import BaseWebAPI
from .proto.braiins.bos import *
from .proto.braiins.bos.v1 import *
@@ -41,14 +45,13 @@ class BOSMinerGRPCStub(
pass
class BOSerGRPCAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
class BOSerWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.username = "root"
self.pwd = pwd
self.pwd = settings.get("default_bosminer_password", "root")
self.port = 50051
self._auth = None
self._auth_time = datetime.now()
self._auth_time = None
@property
def commands(self) -> list:
@@ -85,13 +88,15 @@ class BOSerGRPCAPI:
async def send_command(
self,
command: str,
message: Message = None,
command: str | bytes,
ignore_errors: bool = False,
auth: bool = True,
allow_warning: bool = True,
privileged: bool = False,
**parameters: Any,
) -> dict:
message: betterproto.Message = parameters["message"]
metadata = []
if auth:
if privileged:
metadata.append(("authorization", await self.auth()))
try:
async with Channel(self.ip, self.port) as c:
@@ -111,15 +116,15 @@ class BOSerGRPCAPI:
except GRPCError as e:
raise APIError(f"gRPC command failed - {endpoint}") from e
async def auth(self):
if self._auth is not None and self._auth_time - datetime.now() < timedelta(
async def auth(self) -> str | None:
if self.token is not None and self._auth_time - datetime.now() < timedelta(
seconds=3540
):
return self._auth
return self.token
await self._get_auth()
return self._auth
return self.token
async def _get_auth(self):
async def _get_auth(self) -> str:
async with Channel(self.ip, self.port) as c:
req = LoginRequest(username=self.username, password=self.pwd)
async with c.request(
@@ -132,74 +137,79 @@ class BOSerGRPCAPI:
await stream.recv_initial_metadata()
auth = stream.initial_metadata.get("authorization")
if auth is not None:
self._auth = auth
self.token = auth
self._auth_time = datetime.now()
return self._auth
return self.token
async def get_api_version(self):
async def get_api_version(self) -> dict:
return await self.send_command(
"get_api_version", ApiVersionRequest(), auth=False
"get_api_version", message=ApiVersionRequest(), privileged=False
)
async def start(self):
return await self.send_command("start", StartRequest())
async def start(self) -> dict:
return await self.send_command("start", message=StartRequest())
async def stop(self):
return await self.send_command("stop", StopRequest())
async def stop(self) -> dict:
return await self.send_command("stop", message=StopRequest())
async def pause_mining(self):
return await self.send_command("pause_mining", PauseMiningRequest())
async def pause_mining(self) -> dict:
return await self.send_command("pause_mining", message=PauseMiningRequest())
async def resume_mining(self):
return await self.send_command("resume_mining", ResumeMiningRequest())
async def resume_mining(self) -> dict:
return await self.send_command("resume_mining", message=ResumeMiningRequest())
async def restart(self):
return await self.send_command("restart", RestartRequest())
async def restart(self) -> dict:
return await self.send_command("restart", message=RestartRequest())
async def reboot(self):
return await self.send_command("reboot", RebootRequest())
async def reboot(self) -> dict:
return await self.send_command("reboot", message=RebootRequest())
async def set_locate_device_status(self, enable: bool):
async def set_locate_device_status(self, enable: bool) -> dict:
return await self.send_command(
"set_locate_device_status", SetLocateDeviceStatusRequest(enable=enable)
"set_locate_device_status",
message=SetLocateDeviceStatusRequest(enable=enable),
)
async def get_locate_device_status(self):
async def get_locate_device_status(self) -> dict:
return await self.send_command(
"get_locate_device_status", GetLocateDeviceStatusRequest()
"get_locate_device_status", message=GetLocateDeviceStatusRequest()
)
async def set_password(self, password: str = None):
async def set_password(self, password: str = None) -> dict:
return await self.send_command(
"set_password", SetPasswordRequest(password=password)
"set_password", message=SetPasswordRequest(password=password)
)
async def get_cooling_state(self):
return await self.send_command("get_cooling_state", GetCoolingStateRequest())
async def get_cooling_state(self) -> dict:
return await self.send_command(
"get_cooling_state", message=GetCoolingStateRequest()
)
async def set_immersion_mode(
self,
enable: bool,
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
) -> dict:
return await self.send_command(
"set_immersion_mode",
SetImmersionModeRequest(
message=SetImmersionModeRequest(
enable_immersion_mode=enable, save_action=save_action
),
)
async def get_tuner_state(self):
return await self.send_command("get_tuner_state", GetTunerStateRequest())
async def list_target_profiles(self):
async def get_tuner_state(self) -> dict:
return await self.send_command(
"list_target_profiles", ListTargetProfilesRequest()
"get_tuner_state", message=GetTunerStateRequest()
)
async def list_target_profiles(self) -> dict:
return await self.send_command(
"list_target_profiles", message=ListTargetProfilesRequest()
)
async def set_default_power_target(
self, save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY
):
) -> dict:
return await self.send_command(
"set_default_power_target",
message=SetDefaultPowerTargetRequest(save_action=save_action),
@@ -209,10 +219,10 @@ class BOSerGRPCAPI:
self,
power_target: int,
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
) -> dict:
return await self.send_command(
"set_power_target",
SetPowerTargetRequest(
message=SetPowerTargetRequest(
power_target=Power(watt=power_target), save_action=save_action
),
)
@@ -221,7 +231,7 @@ class BOSerGRPCAPI:
self,
power_target_increment: int,
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
) -> dict:
return await self.send_command(
"increment_power_target",
message=IncrementPowerTargetRequest(
@@ -234,7 +244,7 @@ class BOSerGRPCAPI:
self,
power_target_decrement: int,
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
) -> dict:
return await self.send_command(
"decrement_power_target",
message=DecrementPowerTargetRequest(
@@ -245,7 +255,7 @@ class BOSerGRPCAPI:
async def set_default_hashrate_target(
self, save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY
):
) -> dict:
return await self.send_command(
"set_default_hashrate_target",
message=SetDefaultHashrateTargetRequest(save_action=save_action),
@@ -255,10 +265,10 @@ class BOSerGRPCAPI:
self,
hashrate_target: float,
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
) -> dict:
return await self.send_command(
"set_hashrate_target",
SetHashrateTargetRequest(
message=SetHashrateTargetRequest(
hashrate_target=TeraHashrate(terahash_per_second=hashrate_target),
save_action=save_action,
),
@@ -268,10 +278,10 @@ class BOSerGRPCAPI:
self,
hashrate_target_increment: int,
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
) -> dict:
return await self.send_command(
"increment_hashrate_target",
IncrementHashrateTargetRequest(
message=IncrementHashrateTargetRequest(
hashrate_target_increment=TeraHashrate(
terahash_per_second=hashrate_target_increment
),
@@ -283,10 +293,10 @@ class BOSerGRPCAPI:
self,
hashrate_target_decrement: int,
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
) -> dict:
return await self.send_command(
"decrement_hashrate_target",
DecrementHashrateTargetRequest(
message=DecrementHashrateTargetRequest(
hashrate_target_decrement=TeraHashrate(
terahash_per_second=hashrate_target_decrement
),
@@ -301,7 +311,7 @@ class BOSerGRPCAPI:
min_power_target: int,
enable_shutdown: bool = None,
shutdown_duration: int = None,
):
) -> dict:
return await self.send_command(
"set_dps",
message=SetDpsRequest(
@@ -322,7 +332,7 @@ class BOSerGRPCAPI:
wattage_target: int = None,
hashrate_target: int = None,
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
) -> dict:
if wattage_target is not None and hashrate_target is not None:
logging.error(
"Cannot use both wattage_target and hashrate_target, using wattage_target."
@@ -362,62 +372,71 @@ class BOSerGRPCAPI:
),
)
async def get_active_performance_mode(self):
async def get_active_performance_mode(self) -> dict:
return await self.send_command(
"get_active_performance_mode", GetPerformanceModeRequest()
"get_active_performance_mode", message=GetPerformanceModeRequest()
)
async def get_pool_groups(self):
return await self.send_command("get_pool_groups", GetPoolGroupsRequest())
async def create_pool_group(self):
raise NotImplementedError
return await self.send_command("braiins.bos.v1.PoolService/CreatePoolGroup")
async def update_pool_group(self):
raise NotImplementedError
return await self.send_command("braiins.bos.v1.PoolService/UpdatePoolGroup")
async def remove_pool_group(self):
raise NotImplementedError
return await self.send_command("braiins.bos.v1.PoolService/RemovePoolGroup")
async def get_miner_configuration(self):
async def get_pool_groups(self) -> dict:
return await self.send_command(
"get_miner_configuration", GetMinerConfigurationRequest()
"get_pool_groups", message=GetPoolGroupsRequest()
)
async def get_constraints(self):
return await self.send_command("get_constraints", GetConstraintsRequest())
async def create_pool_group(self) -> dict:
raise NotImplementedError
async def get_license_state(self):
return await self.send_command("get_license_state", GetLicenseStateRequest())
async def update_pool_group(self) -> dict:
raise NotImplementedError
async def get_miner_status(self):
return await self.send_command("get_miner_status", GetMinerStatusRequest())
async def remove_pool_group(self) -> dict:
raise NotImplementedError
async def get_miner_details(self):
return await self.send_command("get_miner_details", GetMinerDetailsRequest())
async def get_miner_stats(self):
return await self.send_command("get_miner_stats", GetMinerStatsRequest())
async def get_hashboards(self):
return await self.send_command("get_hashboards", GetHashboardsRequest())
async def get_support_archive(self):
async def get_miner_configuration(self) -> dict:
return await self.send_command(
"get_support_archive", GetSupportArchiveRequest()
"get_miner_configuration", message=GetMinerConfigurationRequest()
)
async def get_constraints(self) -> dict:
return await self.send_command(
"get_constraints", message=GetConstraintsRequest()
)
async def get_license_state(self) -> dict:
return await self.send_command(
"get_license_state", message=GetLicenseStateRequest()
)
async def get_miner_status(self) -> dict:
return await self.send_command(
"get_miner_status", message=GetMinerStatusRequest()
)
async def get_miner_details(self) -> dict:
return await self.send_command(
"get_miner_details", message=GetMinerDetailsRequest()
)
async def get_miner_stats(self) -> dict:
return await self.send_command(
"get_miner_stats", message=GetMinerStatsRequest()
)
async def get_hashboards(self) -> dict:
return await self.send_command("get_hashboards", message=GetHashboardsRequest())
async def get_support_archive(self) -> dict:
return await self.send_command(
"get_support_archive", message=GetSupportArchiveRequest()
)
async def enable_hashboards(
self,
hashboard_ids: List[str],
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
) -> dict:
return await self.send_command(
"enable_hashboards",
EnableHashboardsRequest(
message=EnableHashboardsRequest(
hashboard_ids=hashboard_ids, save_action=save_action
),
)
@@ -426,10 +445,10 @@ class BOSerGRPCAPI:
self,
hashboard_ids: List[str],
save_action: SaveAction = SaveAction.SAVE_ACTION_SAVE_AND_APPLY,
):
) -> dict:
return await self.send_command(
"disable_hashboards",
DisableHashboardsRequest(
message=DisableHashboardsRequest(
hashboard_ids=hashboard_ids, save_action=save_action
),
)

View File

@@ -13,33 +13,38 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import json
from typing import Any
import httpx
from pyasic import settings
from pyasic.errors import APIError
from pyasic.web import BaseWebAPI
class BOSMinerLuCIAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
class BOSMinerWebAPI(BaseWebAPI):
def __init__(self, ip: str) -> None:
super().__init__(ip)
self.username = "root"
self.pwd = pwd
self.pwd = settings.get("default_bosminer_password", "root")
self.port = 80
async def multicommand(self, *commands: str) -> dict:
data = {}
for command in commands:
data[command] = await self.send_command(command, ignore_errors=True)
return data
async def send_command(self, path: str, ignore_errors: bool = False) -> dict:
async def send_command(
self,
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
privileged: bool = False,
**parameters: Any,
) -> dict:
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
await self.auth(client)
data = await client.get(
f"http://{self.ip}:{self.port}/cgi-bin/luci/{path}",
f"http://{self.ip}:{self.port}/cgi-bin/luci/{command}",
headers={"User-Agent": "BTC Tools v0.1"},
)
if data.status_code == 200:
@@ -47,14 +52,20 @@ class BOSMinerLuCIAPI:
if ignore_errors:
return {}
raise APIError(
f"LUCI web command failed: path={path}, code={data.status_code}"
f"LUCI web command failed: command={command}, code={data.status_code}"
)
except (httpx.HTTPError, json.JSONDecodeError):
if ignore_errors:
return {}
raise APIError(f"LUCI web command failed: path={path}")
raise APIError(f"LUCI web command failed: command={command}")
async def auth(self, session: httpx.AsyncClient):
async def multicommand(self, *commands: str) -> dict:
data = {}
for command in commands:
data[command] = await self.send_command(command, ignore_errors=True)
return data
async def auth(self, session: httpx.AsyncClient) -> None:
login = {"luci_username": self.username, "luci_password": self.pwd}
url = f"http://{self.ip}:{self.port}/cgi-bin/luci"
headers = {
@@ -63,22 +74,22 @@ class BOSMinerLuCIAPI:
}
await session.post(url, headers=headers, data=login)
async def get_net_conf(self):
async def get_net_conf(self) -> dict:
return await self.send_command("admin/network/iface_status/lan")
async def get_cfg_metadata(self):
async def get_cfg_metadata(self) -> dict:
return await self.send_command("admin/miner/cfg_metadata")
async def get_cfg_data(self):
async def get_cfg_data(self) -> dict:
return await self.send_command("admin/miner/cfg_data")
async def get_bos_info(self):
async def get_bos_info(self) -> dict:
return await self.send_command("bos/info")
async def get_overview(self):
async def get_overview(self) -> dict:
return await self.send_command(
"admin/status/overview?status=1"
) # needs status=1 or it fails
async def get_api_status(self):
async def get_api_status(self) -> dict:
return await self.send_command("admin/miner/api_status")

View File

@@ -1,105 +0,0 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import json
from typing import Union
import httpx
from pyasic import settings
class BOSerGraphQLAPI:
def __init__(self, ip: str, pwd: str):
self.ip = ip
self.username = "root"
self.pwd = pwd
self.port = 80
async def multicommand(self, *commands: dict) -> dict:
def merge(*d: dict):
ret = {}
for i in d:
if i:
for k in i:
if k not in ret:
ret[k] = i[k]
else:
ret[k] = merge(ret[k], i[k])
return None if ret == {} else ret
command = merge(*commands)
data = await self.send_command(command)
if data is not None:
if data.get("data") is None:
try:
commands = list(commands)
# noinspection PyTypeChecker
commands.remove({"bos": {"faultLight": None}})
command = merge(*commands)
data = await self.send_command(command)
except (LookupError, ValueError):
pass
if not data:
data = {}
data["multicommand"] = False
return data
async def send_command(
self,
command: dict,
) -> dict:
url = f"http://{self.ip}:{self.port}/graphql"
query = command
if command is None:
return {}
if command.get("query") is None:
query = {"query": self.parse_command(command)}
try:
async with httpx.AsyncClient(transport=settings.transport()) as client:
await self.auth(client)
data = await client.post(url, json=query)
except httpx.HTTPError:
pass
else:
if data.status_code == 200:
try:
return data.json()
except json.decoder.JSONDecodeError:
pass
def parse_command(self, graphql_command: Union[dict, set]) -> str:
if isinstance(graphql_command, dict):
data = []
for key in graphql_command:
if graphql_command[key] is not None:
parsed = self.parse_command(graphql_command[key])
data.append(key + parsed)
else:
data.append(key)
else:
data = graphql_command
return "{" + ",".join(data) + "}"
async def auth(self, client: httpx.AsyncClient) -> None:
url = f"http://{self.ip}:{self.port}/graphql"
await client.post(
url,
json={
"query": (
f'mutation{{auth{{login(username:"{self.username}", password:"{self.pwd}"){{__typename}}}}}}'
)
},
)

View File

@@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import json
from typing import Union
from typing import Any
import httpx
@@ -28,32 +30,30 @@ class ePICWebAPI(BaseWebAPI):
super().__init__(ip)
self.username = "root"
self.pwd = settings.get("default_epic_web_password", "letmein")
self.token = None
self.port = 4028
self.token = None
async def send_command(
self,
command: Union[str, bytes],
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
post: bool = False,
**parameters: Union[str, int, bool],
privileged: bool = False,
**parameters: Any,
) -> dict:
if post or parameters != {}:
post = True
post = privileged or not parameters == {}
async with httpx.AsyncClient(transport=settings.transport()) as client:
for i in range(settings.get("get_data_retries", 1) + 1):
try:
if post:
epic_param = {
"param": parameters.get("parameters"),
"password": self.pwd,
}
response = await client.post(
f"http://{self.ip}:{self.port}/{command}",
timeout=5,
json=epic_param,
json={
**parameters,
"password": self.pwd,
},
)
else:
response = await client.get(
@@ -89,31 +89,31 @@ class ePICWebAPI(BaseWebAPI):
return data
async def restart_epic(self) -> dict:
return await self.send_command("softreboot", post=True)
return await self.send_command("softreboot", privileged=True)
async def reboot(self) -> dict:
return await self.send_command("reboot", post=True)
return await self.send_command("reboot", privileged=True)
async def pause_mining(self) -> dict:
return await self.send_command("miner", post=True, parameters="Stop")
return await self.send_command("miner", param="Stop")
async def resume_mining(self) -> dict:
return await self.send_command("miner", post=True, parameters="Autostart")
return await self.send_command("miner", param="Autostart")
async def stop_mining(self) -> dict:
return await self.send_command("miner", post=True, parameters="Stop")
return await self.send_command("miner", param="Stop")
async def start_mining(self) -> dict:
return await self.send_command("miner", post=True, parameters="Autostart")
return await self.send_command("miner", param="Autostart")
async def summary(self):
async def summary(self) -> dict:
return await self.send_command("summary")
async def hashrate(self):
async def hashrate(self) -> dict:
return await self.send_command("hashrate")
async def network(self):
async def network(self) -> dict:
return await self.send_command("network")
async def capabilities(self):
async def capabilities(self) -> dict:
return await self.send_command("capabilities")

View File

@@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import json
import warnings
from typing import Union
from typing import Any
import httpx
@@ -28,9 +30,9 @@ class GoldshellWebAPI(BaseWebAPI):
super().__init__(ip)
self.username = "admin"
self.pwd = settings.get("default_goldshell_web_password", "123456789")
self.jwt = None
self.token = None
async def auth(self):
async def auth(self) -> str | None:
async with httpx.AsyncClient(transport=settings.transport()) as client:
try:
await client.get(f"http://{self.ip}:{self.port}/user/logout")
@@ -54,47 +56,43 @@ class GoldshellWebAPI(BaseWebAPI):
f"Could not authenticate web token with miner: {self}"
)
else:
self.jwt = auth.get("JWT Token")
self.token = auth.get("JWT Token")
else:
self.jwt = auth.get("JWT Token")
return self.jwt
self.token = auth.get("JWT Token")
return self.token
async def send_command(
self,
command: Union[str, bytes],
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
privileged: bool = False,
**parameters: Any,
) -> dict:
if parameters.get("pool_pwd"):
parameters["pass"] = parameters["pool_pwd"]
parameters.pop("pool_pwd")
if self.jwt is None:
if self.token is None:
await self.auth()
async with httpx.AsyncClient(transport=settings.transport()) as client:
for _ in range(settings.get("get_data_retries", 1)):
try:
if parameters:
if not parameters == {}:
response = await client.put(
f"http://{self.ip}:{self.port}/mcb/{command}",
headers={"Authorization": "Bearer " + self.jwt},
headers={"Authorization": "Bearer " + self.token},
timeout=settings.get("api_function_timeout", 5),
json=parameters,
)
else:
response = await client.get(
f"http://{self.ip}:{self.port}/mcb/{command}",
headers={"Authorization": "Bearer " + self.jwt},
headers={"Authorization": "Bearer " + self.token},
timeout=settings.get("api_function_timeout", 5),
)
json_data = response.json()
return json_data
except httpx.HTTPError:
pass
except json.JSONDecodeError:
pass
except TypeError:
await self.auth()
except (httpx.HTTPError, json.JSONDecodeError):
pass
async def multicommand(
self, *commands: str, ignore_errors: bool = False, allow_warning: bool = True
@@ -107,7 +105,7 @@ class GoldshellWebAPI(BaseWebAPI):
try:
response = await client.get(
f"http://{self.ip}:{self.port}/mcb/{command}",
headers={"Authorization": "Bearer " + self.jwt},
headers={"Authorization": "Bearer " + self.token},
timeout=settings.get("api_function_timeout", 5),
)
json_data = response.json()
@@ -120,19 +118,25 @@ class GoldshellWebAPI(BaseWebAPI):
await self.auth()
return data
async def pools(self):
async def pools(self) -> dict:
return await self.send_command("pools")
async def newpool(self, url: str, user: str, password: str):
return await self.send_command("newpool", url=url, user=user, pool_pwd=password)
async def delpool(self, url: str, user: str, password: str, dragid: int = 0):
async def newpool(self, url: str, user: str, password: str) -> dict:
# looks dumb, but cant pass `pass` since it is a built in type
return await self.send_command(
"delpool", url=url, user=user, pool_pwd=password, dragid=dragid
"newpool", **{"url": url, "user": user, "pass": password}
)
async def setting(self):
async def delpool(
self, url: str, user: str, password: str, dragid: int = 0
) -> dict:
# looks dumb, but cant pass `pass` since it is a built in type
return await self.send_command(
"delpool", **{"url": url, "user": user, "pass": password, "dragid": dragid}
)
async def setting(self) -> dict:
return await self.send_command("setting")
async def status(self):
async def status(self) -> dict:
return await self.send_command("status")

View File

@@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import json
import warnings
from typing import Union
from typing import Any
import httpx
@@ -29,9 +31,9 @@ class InnosiliconWebAPI(BaseWebAPI):
super().__init__(ip)
self.username = "admin"
self.pwd = settings.get("default_innosilicon_web_password", "admin")
self.jwt = None
self.token = None
async def auth(self):
async def auth(self) -> str | None:
async with httpx.AsyncClient(transport=settings.transport()) as client:
try:
auth = await client.post(
@@ -42,24 +44,25 @@ class InnosiliconWebAPI(BaseWebAPI):
warnings.warn(f"Could not authenticate web token with miner: {self}")
else:
json_auth = auth.json()
self.jwt = json_auth.get("jwt")
return self.jwt
self.token = json_auth.get("jwt")
return self.token
async def send_command(
self,
command: Union[str, bytes],
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
privileged: bool = False,
**parameters: Any,
) -> dict:
if self.jwt is None:
if self.token is None:
await self.auth()
async with httpx.AsyncClient(transport=settings.transport()) as client:
for _ in range(settings.get("get_data_retries", 1)):
try:
response = await client.post(
f"http://{self.ip}:{self.port}/api/{command}",
headers={"Authorization": "Bearer " + self.jwt},
headers={"Authorization": "Bearer " + self.token},
timeout=settings.get("api_function_timeout", 5),
json=parameters,
)
@@ -79,9 +82,7 @@ class InnosiliconWebAPI(BaseWebAPI):
raise APIError(json_data["message"])
raise APIError("Innosilicon web api command failed.")
return json_data
except httpx.HTTPError:
pass
except json.JSONDecodeError:
except (httpx.HTTPError, json.JSONDecodeError):
pass
async def multicommand(
@@ -95,7 +96,7 @@ class InnosiliconWebAPI(BaseWebAPI):
try:
response = await client.post(
f"http://{self.ip}:{self.port}/api/{command}",
headers={"Authorization": "Bearer " + self.jwt},
headers={"Authorization": "Bearer " + self.token},
timeout=settings.get("api_function_timeout", 5),
)
json_data = response.json()
@@ -123,14 +124,14 @@ class InnosiliconWebAPI(BaseWebAPI):
async def type(self) -> dict:
return await self.send_command("type")
async def get_all(self):
async def get_all(self) -> dict:
return await self.send_command("getAll")
async def get_error_detail(self):
async def get_error_detail(self) -> dict:
return await self.send_command("getErrorDetail")
async def pools(self):
async def pools(self) -> dict:
return await self.send_command("pools")
async def poweroff(self):
async def poweroff(self) -> dict:
return await self.send_command("poweroff")

View File

@@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
from __future__ import annotations
import json
import warnings
from typing import Union
from typing import Any
import httpx
@@ -30,7 +32,7 @@ class VNishWebAPI(BaseWebAPI):
self.pwd = settings.get("default_vnish_web_password", "admin")
self.token = None
async def auth(self):
async def auth(self) -> str | None:
async with httpx.AsyncClient(transport=settings.transport()) as client:
try:
auth = await client.post(
@@ -51,11 +53,13 @@ class VNishWebAPI(BaseWebAPI):
async def send_command(
self,
command: Union[str, bytes],
command: str | bytes,
ignore_errors: bool = False,
allow_warning: bool = True,
**parameters: Union[str, int, bool],
privileged: bool = False,
**parameters: Any,
) -> dict:
post = privileged or not parameters == {}
if self.token is None:
await self.auth()
async with httpx.AsyncClient(transport=settings.transport()) as client:
@@ -65,15 +69,7 @@ class VNishWebAPI(BaseWebAPI):
if command.startswith("system"):
auth = "Bearer " + self.token
if parameters.get("post"):
parameters.pop("post")
response = await client.post(
f"http://{self.ip}:{self.port}/api/v1/{command}",
headers={"Authorization": auth},
timeout=settings.get("api_function_timeout", 5),
json=parameters,
)
elif not parameters == {}:
if post:
response = await client.post(
f"http://{self.ip}:{self.port}/api/v1/{command}",
headers={"Authorization": auth},
@@ -94,11 +90,7 @@ class VNishWebAPI(BaseWebAPI):
if json_data:
return json_data
return {"success": True}
except httpx.HTTPError:
pass
except json.JSONDecodeError:
pass
except AttributeError:
except (httpx.HTTPError, json.JSONDecodeError, AttributeError):
pass
async def multicommand(
@@ -111,40 +103,40 @@ class VNishWebAPI(BaseWebAPI):
return data
async def restart_vnish(self) -> dict:
return await self.send_command("mining/restart", post=True)
return await self.send_command("mining/restart", privileged=True)
async def reboot(self) -> dict:
return await self.send_command("system/reboot", post=True)
return await self.send_command("system/reboot", privileged=True)
async def pause_mining(self) -> dict:
return await self.send_command("mining/pause", post=True)
return await self.send_command("mining/pause", privileged=True)
async def resume_mining(self) -> dict:
return await self.send_command("mining/resume", post=True)
return await self.send_command("mining/resume", privileged=True)
async def stop_mining(self) -> dict:
return await self.send_command("mining/stop", post=True)
return await self.send_command("mining/stop", privileged=True)
async def start_mining(self) -> dict:
return await self.send_command("mining/start", post=True)
return await self.send_command("mining/start", privileged=True)
async def info(self):
async def info(self) -> dict:
return await self.send_command("info")
async def summary(self):
async def summary(self) -> dict:
return await self.send_command("summary")
async def chips(self):
async def chips(self) -> dict:
return await self.send_command("chips")
async def layout(self):
async def layout(self) -> dict:
return await self.send_command("layout")
async def status(self):
async def status(self) -> dict:
return await self.send_command("status")
async def settings(self):
async def settings(self) -> dict:
return await self.send_command("settings")
async def autotune_presets(self):
async def autotune_presets(self) -> dict:
return await self.send_command("autotune/presets")