From 26d9562c181c1fdfd4da870df698e7ecd717c3a9 Mon Sep 17 00:00:00 2001 From: 1e9abhi1e10 <2311abhiptdr@gmail.com> Date: Wed, 29 May 2024 17:52:01 +0530 Subject: [PATCH] Add structured directories for firmware update --- pyasic/miners/base.py | 119 +-------------------------------------- pyasic/ssh/braiins_os.py | 52 ++--------------- pyasic/updater/bos.py | 111 ++++++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 164 deletions(-) create mode 100644 pyasic/updater/bos.py diff --git a/pyasic/miners/base.py b/pyasic/miners/base.py index 29699129..4a8b5840 100644 --- a/pyasic/miners/base.py +++ b/pyasic/miners/base.py @@ -17,10 +17,6 @@ import asyncio import ipaddress import warnings from typing import List, Optional, Protocol, Tuple, Type, TypeVar, Union -from pathlib import Path -import re -import httpx -import hashlib from pyasic.config import MinerConfig from pyasic.data import Fan, HashBoard, MinerData @@ -527,7 +523,6 @@ class MinerProtocol(Protocol): class BaseMiner(MinerProtocol): def __init__(self, ip: str) -> None: self.ip = ip - self.firmware_manager = FirmwareManager("http://feeds.braiins-os.com") if self.expected_chips is None and self.raw_model is not None: warnings.warn( @@ -544,116 +539,4 @@ class BaseMiner(MinerProtocol): self.ssh = self._ssh_cls(ip) -AnyMiner = TypeVar("AnyMiner", bound=BaseMiner) - - -class FirmwareManager: - class FirmwareManager: - def __init__(self, remote_server_url: str): - """ - Initialize a FirmwareManager instance. - - Args: - remote_server_url (str): The URL of the remote server to fetch firmware information. - """ - self.remote_server_url = remote_server_url - self.version_extractors = {} - - # Register version extractor for braiins_os - self.register_version_extractor("braiins_os", self.extract_braiins_os_version) - - def extract_braiins_os_version(self, firmware_file: Path) -> str: - """ - Extract the firmware version from the filename for braiins_os miners. - - Args: - firmware_file (Path): The firmware file to extract the version from. - - Returns: - str: The extracted firmware version. - - Raises: - ValueError: If the version is not found in the filename. - """ - match = re.search(r"firmware_v(\d+\.\d+\.\d+)\.tar\.gz", firmware_file.name) - if match: - return match.group(1) - raise ValueError("Firmware version not found in the filename.") - - async def get_latest_firmware_info(self) -> dict: - """ - Fetch the latest firmware information from the remote server. - - Returns: - dict: The latest firmware information, including version and SHA256 hash. - - Raises: - httpx.HTTPStatusError: If the HTTP request fails. - """ - async with httpx.AsyncClient() as client: - response = await client.get(f"{self.remote_server_url}/latest") - response.raise_for_status() - return response.json() - - async def download_firmware(self, url: str, file_path: Path): - """ - Download the firmware file from the specified URL and save it to the given file path. - - Args: - url (str): The URL to download the firmware from. - file_path (Path): The file path to save the downloaded firmware. - - Raises: - httpx.HTTPStatusError: If the HTTP request fails. - """ - async with httpx.AsyncClient() as client: - response = await client.get(url) - response.raise_for_status() - with file_path.open("wb") as firmware_file: - firmware_file.write(response.content) - - def calculate_sha256(self, file_path: Path) -> str: - """ - Calculate the SHA256 hash of the specified file. - - Args: - file_path (Path): The file path of the file to calculate the hash for. - - Returns: - str: The SHA256 hash of the file. - """ - sha256 = hashlib.sha256() - with file_path.open("rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - sha256.update(chunk) - return sha256.hexdigest() - - def register_version_extractor(self, miner_type: str, extractor_func): - """ - Register a custom firmware version extraction function for a specific miner type. - - Args: - miner_type (str): The type of miner. - extractor_func (function): The function to extract the firmware version from the firmware file. - """ - self.version_extractors[miner_type] = extractor_func - - def get_firmware_version(self, miner_type: str, firmware_file: Path) -> str: - """ - Extract the firmware version from the firmware file using the registered extractor function for the miner type. - - Args: - miner_type (str): The type of miner. - firmware_file (Path): The firmware file to extract the version from. - - Returns: - str: The firmware version. - - Raises: - ValueError: If no extractor function is registered for the miner type or if the version is not found. - """ - if miner_type not in self.version_extractors: - raise ValueError(f"No version extractor registered for miner type: {miner_type}") - - extractor_func = self.version_extractors[miner_type] - return extractor_func(firmware_file) +AnyMiner = TypeVar("AnyMiner", bound=BaseMiner) \ No newline at end of file diff --git a/pyasic/ssh/braiins_os.py b/pyasic/ssh/braiins_os.py index ca5827d6..618c0e3c 100644 --- a/pyasic/ssh/braiins_os.py +++ b/pyasic/ssh/braiins_os.py @@ -4,8 +4,9 @@ from pyasic.ssh.base import BaseSSH import logging import httpx from pathlib import Path +import os import hashlib -from pyasic.miners.base import FirmwareManager +from pyasic.updater.bos import FirmwareManager def calculate_sha256(file_path): sha256 = hashlib.sha256() @@ -135,14 +136,12 @@ class BOSMinerSSH(BaseSSH): """ return await self.send_command("cat /sys/class/leds/'Red LED'/delay_off") - async def upgrade_firmware(self, file_location: str = None, custom_url: str = None, override_validation: bool = False): + async def upgrade_firmware(self, file_location: str): """ Upgrade the firmware of the BOSMiner device. Args: - file_location (str): The local file path of the firmware to be uploaded. If not provided, the firmware will be downloaded from the internal server. - custom_url (str): Custom URL to download the firmware from. - override_validation (bool): Whether to override SHA256 validation. + file_location (str): The local file path of the firmware to be uploaded. Returns: str: Confirmation message after upgrading the firmware. @@ -150,44 +149,8 @@ class BOSMinerSSH(BaseSSH): try: logger.info("Starting firmware upgrade process.") - if file_location is None: - # Check for cached firmware file - with tempfile.NamedTemporaryFile(delete=False) as temp_firmware_file: - cached_file_location = Path(temp_firmware_file.name) - if cached_file_location.exists(): - logger.info("Cached firmware file found. Checking version.") - # Compare cached firmware version with the latest version on the server - latest_firmware_info = await self.firmware_manager.get_latest_firmware_info() - latest_version = latest_firmware_info.get("version") - latest_hash = latest_firmware_info.get("sha256") - cached_version = self.firmware_manager.get_firmware_version("braiins_os", cached_file_location) - if cached_version == latest_version: - logger.info("Cached firmware version matches the latest version. Using cached file.") - file_location = str(cached_file_location) - else: - logger.info("Cached firmware version does not match the latest version. Downloading new version.") - firmware_url = custom_url or latest_firmware_info.get("url") - if not firmware_url: - raise ValueError("Firmware URL not found in the server response.") - await self.firmware_manager.download_firmware(firmware_url, cached_file_location) - if not override_validation: - downloaded_hash = self.firmware_manager.calculate_sha256(cached_file_location) - if downloaded_hash != latest_hash: - raise ValueError("SHA256 hash validation failed for the downloaded firmware file.") - file_location = str(cached_file_location) - else: - logger.info("No cached firmware file found. Downloading new version.") - latest_firmware_info = await self.firmware_manager.get_latest_firmware_info() - firmware_url = custom_url or latest_firmware_info.get("url") - latest_hash = latest_firmware_info.get("sha256") - if not firmware_url: - raise ValueError("Firmware URL not found in the server response.") - await self.firmware_manager.download_firmware(firmware_url, cached_file_location) - if not override_validation: - downloaded_hash = self.firmware_manager.calculate_sha256(cached_file_location) - if downloaded_hash != latest_hash: - raise ValueError("SHA256 hash validation failed for the downloaded firmware file.") - file_location = str(cached_file_location) + if not file_location: + raise ValueError("File location must be provided for firmware upgrade.") # Upload the firmware file to the BOSMiner device logger.info(f"Uploading firmware file from {file_location} to the device.") @@ -203,9 +166,6 @@ class BOSMinerSSH(BaseSSH): logger.info("Firmware upgrade process completed successfully.") return result - except httpx.HTTPStatusError as e: - logger.error(f"HTTP error occurred during the firmware upgrade process: {e}") - raise except FileNotFoundError as e: logger.error(f"File not found during the firmware upgrade process: {e}") raise diff --git a/pyasic/updater/bos.py b/pyasic/updater/bos.py new file mode 100644 index 00000000..75f84dc8 --- /dev/null +++ b/pyasic/updater/bos.py @@ -0,0 +1,111 @@ +import httpx +from pathlib import Path +import re +import hashlib + +class FirmwareManager: + def __init__(self): + """ + Initialize a FirmwareManager instance. + """ + self.remote_server_url = "http://feeds.braiins-os.com" + self.version_extractors = {} + + # Register version extractor for braiins_os + self.register_version_extractor("braiins_os", self.extract_braiins_os_version) + + def extract_braiins_os_version(self, firmware_file: Path) -> str: + """ + Extract the firmware version from the filename for braiins_os miners. + + Args: + firmware_file (Path): The firmware file to extract the version from. + + Returns: + str: The extracted firmware version. + + Raises: + ValueError: If the version is not found in the filename. + """ + match = re.search(r"(am1_s9|am2_x17|am3_bbb)/firmware_v(\d+\.\d+\.\d+)\.tar", firmware_file.name) + if match: + return match.group(2) + raise ValueError("Firmware version not found in the filename.") + + async def get_latest_firmware_info(self) -> dict: + """ + Fetch the latest firmware information from the remote server. + + Returns: + dict: The latest firmware information, including version and SHA256 hash. + + Raises: + httpx.HTTPStatusError: If the HTTP request fails. + """ + async with httpx.AsyncClient() as client: + response = await client.get(f"{self.remote_server_url}/latest") + response.raise_for_status() + return response.json() + + async def download_firmware(self, url: str, file_path: Path): + """ + Download the firmware file from the specified URL and save it to the given file path. + + Args: + url (str): The URL to download the firmware from. + file_path (Path): The file path to save the downloaded firmware. + + Raises: + httpx.HTTPStatusError: If the HTTP request fails. + """ + async with httpx.AsyncClient() as client: + response = await client.get(url) + response.raise_for_status() + with file_path.open("wb") as firmware_file: + firmware_file.write(response.content) + + def calculate_sha256(self, file_path: Path) -> str: + """ + Calculate the SHA256 hash of the specified file. + + Args: + file_path (Path): The file path of the file to calculate the hash for. + + Returns: + str: The SHA256 hash of the file. + """ + sha256 = hashlib.sha256() + with file_path.open("rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + sha256.update(chunk) + return sha256.hexdigest() + + def register_version_extractor(self, miner_type: str, extractor_func): + """ + Register a custom firmware version extraction function for a specific miner type. + + Args: + miner_type (str): The type of miner. + extractor_func (function): The function to extract the firmware version from the firmware file. + """ + self.version_extractors[miner_type] = extractor_func + + def get_firmware_version(self, miner_type: str, firmware_file: Path) -> str: + """ + Extract the firmware version from the firmware file using the registered extractor function for the miner type. + + Args: + miner_type (str): The type of miner. + firmware_file (Path): The firmware file to extract the version from. + + Returns: + str: The firmware version. + + Raises: + ValueError: If no extractor function is registered for the miner type or if the version is not found. + """ + if miner_type not in self.version_extractors: + raise ValueError(f"No version extractor registered for miner type: {miner_type}") + + extractor_func = self.version_extractors[miner_type] + return extractor_func(firmware_file) \ No newline at end of file