Improved file structure and add requested changes
This commit is contained in:
@@ -37,6 +37,9 @@ from pyasic.ssh.braiins_os import BOSMinerSSH
|
|||||||
from pyasic.web.braiins_os import BOSerWebAPI, BOSMinerWebAPI
|
from pyasic.web.braiins_os import BOSerWebAPI, BOSMinerWebAPI
|
||||||
from pyasic.web.braiins_os.proto.braiins.bos.v1 import SaveAction
|
from pyasic.web.braiins_os.proto.braiins.bos.v1 import SaveAction
|
||||||
|
|
||||||
|
import aiofiles
|
||||||
|
import base64
|
||||||
|
|
||||||
BOSMINER_DATA_LOC = DataLocations(
|
BOSMINER_DATA_LOC = DataLocations(
|
||||||
**{
|
**{
|
||||||
str(DataOptions.MAC): DataFunction(
|
str(DataOptions.MAC): DataFunction(
|
||||||
@@ -570,6 +573,48 @@ class BOSMiner(BraiinsOSFirmware):
|
|||||||
except LookupError:
|
except LookupError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
async def upgrade_firmware(self, file: Path):
|
||||||
|
"""
|
||||||
|
Upgrade the firmware of the BOSMiner device.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file (Path): The local file path of the firmware to be uploaded.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Confirmation message after upgrading the firmware.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.logger.info("Starting firmware upgrade process.")
|
||||||
|
|
||||||
|
if not file:
|
||||||
|
raise ValueError("File location must be provided for firmware upgrade.")
|
||||||
|
|
||||||
|
# Read the firmware file contents
|
||||||
|
async with aiofiles.open(file, "rb") as f:
|
||||||
|
upgrade_contents = await f.read()
|
||||||
|
|
||||||
|
# Encode the firmware contents in base64
|
||||||
|
encoded_contents = base64.b64encode(upgrade_contents).decode('utf-8')
|
||||||
|
|
||||||
|
# Upload the firmware file to the BOSMiner device
|
||||||
|
self.logger.info(f"Uploading firmware file from {file} to the device.")
|
||||||
|
await self.ssh.send_command(f"echo {encoded_contents} | base64 -d > /tmp/firmware.tar && sysupgrade /tmp/firmware.tar")
|
||||||
|
|
||||||
|
self.logger.info("Firmware upgrade process completed successfully.")
|
||||||
|
return "Firmware upgrade completed successfully."
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
self.logger.error(f"File not found during the firmware upgrade process: {e}")
|
||||||
|
raise
|
||||||
|
except ValueError as e:
|
||||||
|
self.logger.error(f"Validation error occurred during the firmware upgrade process: {e}")
|
||||||
|
raise
|
||||||
|
except OSError as e:
|
||||||
|
self.logger.error(f"OS error occurred during the firmware upgrade process: {e}")
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"An unexpected error occurred during the firmware upgrade process: {e}", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
BOSER_DATA_LOC = DataLocations(
|
BOSER_DATA_LOC = DataLocations(
|
||||||
**{
|
**{
|
||||||
|
|||||||
@@ -1,23 +1,5 @@
|
|||||||
from pyasic import settings
|
from pyasic import settings
|
||||||
from pyasic.ssh.base import BaseSSH
|
from pyasic.ssh.base import BaseSSH
|
||||||
import logging
|
|
||||||
import hashlib
|
|
||||||
from pyasic.updater.bos import FirmwareManager
|
|
||||||
|
|
||||||
def calculate_sha256(file_path):
|
|
||||||
sha256 = hashlib.sha256()
|
|
||||||
with open(file_path, "rb") as f:
|
|
||||||
for chunk in iter(lambda: f.read(4096), b""):
|
|
||||||
sha256.update(chunk)
|
|
||||||
return sha256.hexdigest()
|
|
||||||
|
|
||||||
# Set up logging
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
logger.setLevel(logging.DEBUG)
|
|
||||||
handler = logging.StreamHandler()
|
|
||||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
||||||
handler.setFormatter(formatter)
|
|
||||||
logger.addHandler(handler)
|
|
||||||
|
|
||||||
|
|
||||||
class BOSMinerSSH(BaseSSH):
|
class BOSMinerSSH(BaseSSH):
|
||||||
@@ -30,27 +12,6 @@ class BOSMinerSSH(BaseSSH):
|
|||||||
"""
|
"""
|
||||||
super().__init__(ip)
|
super().__init__(ip)
|
||||||
self.pwd = settings.get("default_bosminer_ssh_password", "root")
|
self.pwd = settings.get("default_bosminer_ssh_password", "root")
|
||||||
self.firmware_manager = FirmwareManager()
|
|
||||||
|
|
||||||
def get_firmware_version(self, firmware_file):
|
|
||||||
"""
|
|
||||||
Extract the firmware version from the firmware file.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
firmware_file (file): The firmware file to extract the version from.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The firmware version.
|
|
||||||
"""
|
|
||||||
import re
|
|
||||||
|
|
||||||
# Extract the version from the filename using a regular expression
|
|
||||||
filename = firmware_file.name
|
|
||||||
match = re.search(r"firmware_v(\d+\.\d+\.\d+)\.tar\.gz", filename)
|
|
||||||
if match:
|
|
||||||
return match.group(1)
|
|
||||||
else:
|
|
||||||
raise ValueError("Firmware version not found in the filename.")
|
|
||||||
|
|
||||||
async def get_board_info(self):
|
async def get_board_info(self):
|
||||||
"""
|
"""
|
||||||
@@ -132,46 +93,3 @@ class BOSMinerSSH(BaseSSH):
|
|||||||
str: Status of the LED.
|
str: Status of the LED.
|
||||||
"""
|
"""
|
||||||
return await self.send_command("cat /sys/class/leds/'Red LED'/delay_off")
|
return await self.send_command("cat /sys/class/leds/'Red LED'/delay_off")
|
||||||
|
|
||||||
async def upgrade_firmware(self, file_location: str):
|
|
||||||
"""
|
|
||||||
Upgrade the firmware of the BOSMiner device.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_location (str): The local file path of the firmware to be uploaded.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: Confirmation message after upgrading the firmware.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
logger.info("Starting firmware upgrade process.")
|
|
||||||
|
|
||||||
if not file_location:
|
|
||||||
raise ValueError("File location must be provided for firmware upgrade.")
|
|
||||||
|
|
||||||
# Upload the firmware file to the BOSMiner device
|
|
||||||
logger.info(f"Uploading firmware file from {file_location} to the device.")
|
|
||||||
await self.send_command(f"scp {file_location} root@{self.ip}:/tmp/firmware.tar.gz")
|
|
||||||
|
|
||||||
# Extract the firmware file
|
|
||||||
logger.info("Extracting the firmware file on the device.")
|
|
||||||
await self.send_command("tar -xzf /tmp/firmware.tar.gz -C /tmp")
|
|
||||||
|
|
||||||
# Run the firmware upgrade script
|
|
||||||
logger.info("Running the firmware upgrade script on the device.")
|
|
||||||
result = await self.send_command("sh /tmp/upgrade_firmware.sh")
|
|
||||||
|
|
||||||
logger.info("Firmware upgrade process completed successfully.")
|
|
||||||
return result
|
|
||||||
except FileNotFoundError as e:
|
|
||||||
logger.error(f"File not found during the firmware upgrade process: {e}")
|
|
||||||
raise
|
|
||||||
except ValueError as e:
|
|
||||||
logger.error(f"Validation error occurred during the firmware upgrade process: {e}")
|
|
||||||
raise
|
|
||||||
except OSError as e:
|
|
||||||
logger.error(f"OS error occurred during the firmware upgrade process: {e}")
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"An unexpected error occurred during the firmware upgrade process: {e}", exc_info=True)
|
|
||||||
raise
|
|
||||||
42
pyasic/updater/__init__.py
Normal file
42
pyasic/updater/__init__.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from pyasic.updater.bos import FirmwareManager
|
||||||
|
|
||||||
|
class FirmwareUpdater:
|
||||||
|
def __init__(self, firmware, raw_model, ssh_client):
|
||||||
|
"""
|
||||||
|
Initialize a FirmwareUpdater instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
firmware: The firmware type of the miner.
|
||||||
|
raw_model: The raw model of the miner.
|
||||||
|
ssh_client: The SSH client to use for sending commands to the device.
|
||||||
|
"""
|
||||||
|
self.firmware = firmware
|
||||||
|
self.raw_model = raw_model
|
||||||
|
self.ssh_client = ssh_client
|
||||||
|
self.manager = self._get_manager()
|
||||||
|
|
||||||
|
def _get_manager(self):
|
||||||
|
"""
|
||||||
|
Get the appropriate firmware manager based on the firmware type and raw model.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The firmware manager instance.
|
||||||
|
"""
|
||||||
|
if self.firmware == "braiins_os":
|
||||||
|
return FirmwareManager(self.ssh_client)
|
||||||
|
# Add more conditions here for different firmware types and raw models
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unsupported firmware type: {self.firmware}")
|
||||||
|
|
||||||
|
async def upgrade_firmware(self, file: Path):
|
||||||
|
"""
|
||||||
|
Upgrade the firmware of the miner.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file (Path): The local file path of the firmware to be uploaded.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Confirmation message after upgrading the firmware.
|
||||||
|
"""
|
||||||
|
return await self.manager.upgrade_firmware(file)
|
||||||
@@ -2,14 +2,28 @@ import httpx
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import re
|
import re
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import aiofiles
|
||||||
|
import logging
|
||||||
|
|
||||||
class FirmwareManager:
|
class FirmwareManager:
|
||||||
def __init__(self):
|
def __init__(self, ssh_client):
|
||||||
"""
|
"""
|
||||||
Initialize a FirmwareManager instance.
|
Initialize a FirmwareManager instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ssh_client: The SSH client to use for sending commands to the device.
|
||||||
"""
|
"""
|
||||||
self.remote_server_url = "http://feeds.braiins-os.com"
|
self.remote_server_url = "http://feeds.braiins-os.com"
|
||||||
self.version_extractors = {}
|
self.version_extractors = {}
|
||||||
|
self.ssh = ssh_client
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
self.logger.setLevel(logging.DEBUG)
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
self.logger.addHandler(handler)
|
||||||
|
|
||||||
# Register version extractor for braiins_os
|
# Register version extractor for braiins_os
|
||||||
self.register_version_extractor("braiins_os", self.extract_braiins_os_version)
|
self.register_version_extractor("braiins_os", self.extract_braiins_os_version)
|
||||||
@@ -109,3 +123,46 @@ class FirmwareManager:
|
|||||||
|
|
||||||
extractor_func = self.version_extractors[miner_type]
|
extractor_func = self.version_extractors[miner_type]
|
||||||
return extractor_func(firmware_file)
|
return extractor_func(firmware_file)
|
||||||
|
|
||||||
|
async def upgrade_firmware(self, file: Path):
|
||||||
|
"""
|
||||||
|
Upgrade the firmware of the BOSMiner device.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file (Path): The local file path of the firmware to be uploaded.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Confirmation message after upgrading the firmware.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.logger.info("Starting firmware upgrade process.")
|
||||||
|
|
||||||
|
if not file:
|
||||||
|
raise ValueError("File location must be provided for firmware upgrade.")
|
||||||
|
|
||||||
|
# Read the firmware file contents
|
||||||
|
async with aiofiles.open(file, "rb") as f:
|
||||||
|
upgrade_contents = await f.read()
|
||||||
|
|
||||||
|
# Encode the firmware contents in base64
|
||||||
|
import base64
|
||||||
|
encoded_contents = base64.b64encode(upgrade_contents).decode('utf-8')
|
||||||
|
|
||||||
|
# Upload the firmware file to the BOSMiner device
|
||||||
|
self.logger.info(f"Uploading firmware file from {file} to the device.")
|
||||||
|
await self.ssh.send_command(f"echo {encoded_contents} | base64 -d > /tmp/firmware.tar && sysupgrade /tmp/firmware.tar")
|
||||||
|
|
||||||
|
self.logger.info("Firmware upgrade process completed successfully.")
|
||||||
|
return "Firmware upgrade completed successfully."
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
self.logger.error(f"File not found during the firmware upgrade process: {e}")
|
||||||
|
raise
|
||||||
|
except ValueError as e:
|
||||||
|
self.logger.error(f"Validation error occurred during the firmware upgrade process: {e}")
|
||||||
|
raise
|
||||||
|
except OSError as e:
|
||||||
|
self.logger.error(f"OS error occurred during the firmware upgrade process: {e}")
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"An unexpected error occurred during the firmware upgrade process: {e}", exc_info=True)
|
||||||
|
raise
|
||||||
|
|||||||
Reference in New Issue
Block a user