Add structured directories for firmware update

This commit is contained in:
1e9abhi1e10
2024-05-29 17:52:01 +05:30
parent 0bd5c22681
commit 26d9562c18
3 changed files with 118 additions and 164 deletions

View File

@@ -17,10 +17,6 @@ import asyncio
import ipaddress
import warnings
from typing import List, Optional, Protocol, Tuple, Type, TypeVar, Union
from pathlib import Path
import re
import httpx
import hashlib
from pyasic.config import MinerConfig
from pyasic.data import Fan, HashBoard, MinerData
@@ -527,7 +523,6 @@ class MinerProtocol(Protocol):
class BaseMiner(MinerProtocol):
def __init__(self, ip: str) -> None:
self.ip = ip
self.firmware_manager = FirmwareManager("http://feeds.braiins-os.com")
if self.expected_chips is None and self.raw_model is not None:
warnings.warn(
@@ -544,116 +539,4 @@ class BaseMiner(MinerProtocol):
self.ssh = self._ssh_cls(ip)
AnyMiner = TypeVar("AnyMiner", bound=BaseMiner)
class FirmwareManager:
class FirmwareManager:
def __init__(self, remote_server_url: str):
"""
Initialize a FirmwareManager instance.
Args:
remote_server_url (str): The URL of the remote server to fetch firmware information.
"""
self.remote_server_url = remote_server_url
self.version_extractors = {}
# Register version extractor for braiins_os
self.register_version_extractor("braiins_os", self.extract_braiins_os_version)
def extract_braiins_os_version(self, firmware_file: Path) -> str:
"""
Extract the firmware version from the filename for braiins_os miners.
Args:
firmware_file (Path): The firmware file to extract the version from.
Returns:
str: The extracted firmware version.
Raises:
ValueError: If the version is not found in the filename.
"""
match = re.search(r"firmware_v(\d+\.\d+\.\d+)\.tar\.gz", firmware_file.name)
if match:
return match.group(1)
raise ValueError("Firmware version not found in the filename.")
async def get_latest_firmware_info(self) -> dict:
"""
Fetch the latest firmware information from the remote server.
Returns:
dict: The latest firmware information, including version and SHA256 hash.
Raises:
httpx.HTTPStatusError: If the HTTP request fails.
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.remote_server_url}/latest")
response.raise_for_status()
return response.json()
async def download_firmware(self, url: str, file_path: Path):
"""
Download the firmware file from the specified URL and save it to the given file path.
Args:
url (str): The URL to download the firmware from.
file_path (Path): The file path to save the downloaded firmware.
Raises:
httpx.HTTPStatusError: If the HTTP request fails.
"""
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
with file_path.open("wb") as firmware_file:
firmware_file.write(response.content)
def calculate_sha256(self, file_path: Path) -> str:
"""
Calculate the SHA256 hash of the specified file.
Args:
file_path (Path): The file path of the file to calculate the hash for.
Returns:
str: The SHA256 hash of the file.
"""
sha256 = hashlib.sha256()
with file_path.open("rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256.update(chunk)
return sha256.hexdigest()
def register_version_extractor(self, miner_type: str, extractor_func):
"""
Register a custom firmware version extraction function for a specific miner type.
Args:
miner_type (str): The type of miner.
extractor_func (function): The function to extract the firmware version from the firmware file.
"""
self.version_extractors[miner_type] = extractor_func
def get_firmware_version(self, miner_type: str, firmware_file: Path) -> str:
"""
Extract the firmware version from the firmware file using the registered extractor function for the miner type.
Args:
miner_type (str): The type of miner.
firmware_file (Path): The firmware file to extract the version from.
Returns:
str: The firmware version.
Raises:
ValueError: If no extractor function is registered for the miner type or if the version is not found.
"""
if miner_type not in self.version_extractors:
raise ValueError(f"No version extractor registered for miner type: {miner_type}")
extractor_func = self.version_extractors[miner_type]
return extractor_func(firmware_file)
AnyMiner = TypeVar("AnyMiner", bound=BaseMiner)

View File

@@ -4,8 +4,9 @@ from pyasic.ssh.base import BaseSSH
import logging
import httpx
from pathlib import Path
import os
import hashlib
from pyasic.miners.base import FirmwareManager
from pyasic.updater.bos import FirmwareManager
def calculate_sha256(file_path):
sha256 = hashlib.sha256()
@@ -135,14 +136,12 @@ class BOSMinerSSH(BaseSSH):
"""
return await self.send_command("cat /sys/class/leds/'Red LED'/delay_off")
async def upgrade_firmware(self, file_location: str = None, custom_url: str = None, override_validation: bool = False):
async def upgrade_firmware(self, file_location: str):
"""
Upgrade the firmware of the BOSMiner device.
Args:
file_location (str): The local file path of the firmware to be uploaded. If not provided, the firmware will be downloaded from the internal server.
custom_url (str): Custom URL to download the firmware from.
override_validation (bool): Whether to override SHA256 validation.
file_location (str): The local file path of the firmware to be uploaded.
Returns:
str: Confirmation message after upgrading the firmware.
@@ -150,44 +149,8 @@ class BOSMinerSSH(BaseSSH):
try:
logger.info("Starting firmware upgrade process.")
if file_location is None:
# Check for cached firmware file
with tempfile.NamedTemporaryFile(delete=False) as temp_firmware_file:
cached_file_location = Path(temp_firmware_file.name)
if cached_file_location.exists():
logger.info("Cached firmware file found. Checking version.")
# Compare cached firmware version with the latest version on the server
latest_firmware_info = await self.firmware_manager.get_latest_firmware_info()
latest_version = latest_firmware_info.get("version")
latest_hash = latest_firmware_info.get("sha256")
cached_version = self.firmware_manager.get_firmware_version("braiins_os", cached_file_location)
if cached_version == latest_version:
logger.info("Cached firmware version matches the latest version. Using cached file.")
file_location = str(cached_file_location)
else:
logger.info("Cached firmware version does not match the latest version. Downloading new version.")
firmware_url = custom_url or latest_firmware_info.get("url")
if not firmware_url:
raise ValueError("Firmware URL not found in the server response.")
await self.firmware_manager.download_firmware(firmware_url, cached_file_location)
if not override_validation:
downloaded_hash = self.firmware_manager.calculate_sha256(cached_file_location)
if downloaded_hash != latest_hash:
raise ValueError("SHA256 hash validation failed for the downloaded firmware file.")
file_location = str(cached_file_location)
else:
logger.info("No cached firmware file found. Downloading new version.")
latest_firmware_info = await self.firmware_manager.get_latest_firmware_info()
firmware_url = custom_url or latest_firmware_info.get("url")
latest_hash = latest_firmware_info.get("sha256")
if not firmware_url:
raise ValueError("Firmware URL not found in the server response.")
await self.firmware_manager.download_firmware(firmware_url, cached_file_location)
if not override_validation:
downloaded_hash = self.firmware_manager.calculate_sha256(cached_file_location)
if downloaded_hash != latest_hash:
raise ValueError("SHA256 hash validation failed for the downloaded firmware file.")
file_location = str(cached_file_location)
if not file_location:
raise ValueError("File location must be provided for firmware upgrade.")
# Upload the firmware file to the BOSMiner device
logger.info(f"Uploading firmware file from {file_location} to the device.")
@@ -203,9 +166,6 @@ class BOSMinerSSH(BaseSSH):
logger.info("Firmware upgrade process completed successfully.")
return result
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred during the firmware upgrade process: {e}")
raise
except FileNotFoundError as e:
logger.error(f"File not found during the firmware upgrade process: {e}")
raise

111
pyasic/updater/bos.py Normal file
View File

@@ -0,0 +1,111 @@
import httpx
from pathlib import Path
import re
import hashlib
class FirmwareManager:
def __init__(self):
"""
Initialize a FirmwareManager instance.
"""
self.remote_server_url = "http://feeds.braiins-os.com"
self.version_extractors = {}
# Register version extractor for braiins_os
self.register_version_extractor("braiins_os", self.extract_braiins_os_version)
def extract_braiins_os_version(self, firmware_file: Path) -> str:
"""
Extract the firmware version from the filename for braiins_os miners.
Args:
firmware_file (Path): The firmware file to extract the version from.
Returns:
str: The extracted firmware version.
Raises:
ValueError: If the version is not found in the filename.
"""
match = re.search(r"(am1_s9|am2_x17|am3_bbb)/firmware_v(\d+\.\d+\.\d+)\.tar", firmware_file.name)
if match:
return match.group(2)
raise ValueError("Firmware version not found in the filename.")
async def get_latest_firmware_info(self) -> dict:
"""
Fetch the latest firmware information from the remote server.
Returns:
dict: The latest firmware information, including version and SHA256 hash.
Raises:
httpx.HTTPStatusError: If the HTTP request fails.
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.remote_server_url}/latest")
response.raise_for_status()
return response.json()
async def download_firmware(self, url: str, file_path: Path):
"""
Download the firmware file from the specified URL and save it to the given file path.
Args:
url (str): The URL to download the firmware from.
file_path (Path): The file path to save the downloaded firmware.
Raises:
httpx.HTTPStatusError: If the HTTP request fails.
"""
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
with file_path.open("wb") as firmware_file:
firmware_file.write(response.content)
def calculate_sha256(self, file_path: Path) -> str:
"""
Calculate the SHA256 hash of the specified file.
Args:
file_path (Path): The file path of the file to calculate the hash for.
Returns:
str: The SHA256 hash of the file.
"""
sha256 = hashlib.sha256()
with file_path.open("rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256.update(chunk)
return sha256.hexdigest()
def register_version_extractor(self, miner_type: str, extractor_func):
"""
Register a custom firmware version extraction function for a specific miner type.
Args:
miner_type (str): The type of miner.
extractor_func (function): The function to extract the firmware version from the firmware file.
"""
self.version_extractors[miner_type] = extractor_func
def get_firmware_version(self, miner_type: str, firmware_file: Path) -> str:
"""
Extract the firmware version from the firmware file using the registered extractor function for the miner type.
Args:
miner_type (str): The type of miner.
firmware_file (Path): The firmware file to extract the version from.
Returns:
str: The firmware version.
Raises:
ValueError: If no extractor function is registered for the miner type or if the version is not found.
"""
if miner_type not in self.version_extractors:
raise ValueError(f"No version extractor registered for miner type: {miner_type}")
extractor_func = self.version_extractors[miner_type]
return extractor_func(firmware_file)