Move upgrade_firmware to pyasic.rpc.btminer.py

This commit is contained in:
1e9abhi1e10
2024-06-07 06:08:52 +05:30
parent 4d45b6e50f
commit 18e6fc2a3c
2 changed files with 20 additions and 74 deletions

View File

@@ -14,12 +14,6 @@
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
from pyasic.miners.backends.btminer import BTMiner from pyasic.miners.backends.btminer import BTMiner
import asyncio
import json
import struct
import logging
from pathlib import Path
import aiofiles
class M6X(BTMiner): class M6X(BTMiner):
@@ -36,65 +30,3 @@ class M3X(BTMiner):
class M2X(BTMiner): class M2X(BTMiner):
pass pass
class Whatsminer(BTMiner):
async def upgrade_firmware(self, file: Path, token: str):
"""
Upgrade the firmware of the Whatsminer device.
Args:
file (Path): The local file path of the firmware to be uploaded.
token (str): The authentication token for the firmware upgrade.
Returns:
str: Confirmation message after upgrading the firmware.
"""
try:
logging.info("Starting firmware upgrade process for Whatsminer.")
if not file:
raise ValueError("File location must be provided for firmware upgrade.")
# Read the firmware file contents
async with aiofiles.open(file, "rb") as f:
upgrade_contents = await f.read()
# Establish a TCP connection to the miner
reader, writer = await asyncio.open_connection(self.ip, self.port)
# Send the update_firmware command
command = json.dumps({"token": token, "cmd": "update_firmware"})
writer.write(command.encode())
await writer.drain()
# Wait for the miner to respond with "ready"
response = await reader.read(1024)
response_json = json.loads(response.decode())
if response_json.get("Msg") != "ready":
raise Exception("Miner is not ready for firmware upgrade.")
# Send the firmware file size and data
file_size = struct.pack("<I", len(upgrade_contents))
writer.write(file_size)
writer.write(upgrade_contents)
await writer.drain()
# Close the connection
writer.close()
await writer.wait_closed()
logging.info("Firmware upgrade process completed successfully for Whatsminer.")
return "Firmware upgrade completed successfully."
except FileNotFoundError as e:
logging.error(f"File not found during the firmware upgrade process: {e}")
raise
except ValueError as e:
logging.error(f"Validation error occurred during the firmware upgrade process: {e}")
raise
except OSError as e:
logging.error(f"OS error occurred during the firmware upgrade process: {e}")
raise
except Exception as e:
logging.error(f"An unexpected error occurred during the firmware upgrade process: {e}", exc_info=True)
raise

View File

@@ -23,6 +23,7 @@ import json
import logging import logging
import re import re
from typing import Literal, Union from typing import Literal, Union
import struct
import httpx import httpx
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
@@ -559,11 +560,24 @@ class BTMinerRPCAPI(BaseMinerRPCAPI):
""" """
return await self.send_privileged_command("set_normal_power") return await self.send_privileged_command("set_normal_power")
async def update_firmware(self): # noqa - static async def upgrade_firmware(self, firmware: bytes):
"""Not implemented.""" """Upgrade the firmware running on the miner and using the firmware passed in bytes.
# to be determined if this will be added later
# requires a file stream in bytes Parameters:
return NotImplementedError firmware (bytes): The firmware binary data to be uploaded.
Returns:
bool: A boolean indicating the success of the firmware upgrade.
Raises:
APIError: If the miner is not ready for firmware update.
"""
ready = await self.send_privileged_command("upgrade_firmware")
if not ready.get("Msg") == "ready":
raise APIError(f"Not ready for firmware update: {self}")
file_size = struct.pack("<I", len(firmware))
await self._send_bytes(file_size + firmware)
return True
async def reboot(self, timeout: int = 10) -> dict: async def reboot(self, timeout: int = 10) -> dict:
"""Reboot the miner using the API. """Reboot the miner using the API.