Add upgrade_firmware for BOS miner.
This commit is contained in:
@@ -1,6 +1,16 @@
|
|||||||
from pyasic import settings
|
from pyasic import settings
|
||||||
from pyasic.ssh.base import BaseSSH
|
from pyasic.ssh.base import BaseSSH
|
||||||
|
import logging
|
||||||
|
import requests
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
logger.addHandler(handler)
|
||||||
|
|
||||||
class BOSMinerSSH(BaseSSH):
|
class BOSMinerSSH(BaseSSH):
|
||||||
def __init__(self, ip: str):
|
def __init__(self, ip: str):
|
||||||
@@ -94,14 +104,70 @@ class BOSMinerSSH(BaseSSH):
|
|||||||
"""
|
"""
|
||||||
return await self.send_command("cat /sys/class/leds/'Red LED'/delay_off")
|
return await self.send_command("cat /sys/class/leds/'Red LED'/delay_off")
|
||||||
|
|
||||||
async def upgrade_firmware(self, file: str):
|
async def upgrade_firmware(self, file_location: str = None):
|
||||||
"""Upgrade the firmware of the BrainOS miner using a specified file.
|
"""
|
||||||
|
Upgrade the firmware of the BOSMiner device.
|
||||||
This function upgrades the firmware of the BrainOS miner using a specified file.
|
|
||||||
It downloads the firmware file to a temporary location and performs the upgrade.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
file (str): The URL or local path of the firmware file.
|
file_location (str): The local file path of the firmware to be uploaded. If not provided, the firmware will be downloaded from the internal server.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Confirmation message after upgrading the firmware.
|
||||||
"""
|
"""
|
||||||
pass
|
try:
|
||||||
|
logger.info("Starting firmware upgrade process.")
|
||||||
|
|
||||||
|
if file_location is None:
|
||||||
|
# Check for cached firmware file
|
||||||
|
cached_file_location = "/tmp/cached_firmware.tar.gz"
|
||||||
|
if os.path.exists(cached_file_location):
|
||||||
|
logger.info("Cached firmware file found. Checking version.")
|
||||||
|
# Compare cached firmware version with the latest version on the server
|
||||||
|
response = requests.get("http://firmware.pyasic.org/latest")
|
||||||
|
response.raise_for_status()
|
||||||
|
latest_version = response.json().get("version")
|
||||||
|
cached_version = self._get_fw_ver()
|
||||||
|
|
||||||
|
if cached_version == latest_version:
|
||||||
|
logger.info("Cached firmware version matches the latest version. Using cached file.")
|
||||||
|
file_location = cached_file_location
|
||||||
|
else:
|
||||||
|
logger.info("Cached firmware version does not match the latest version. Downloading new version.")
|
||||||
|
firmware_url = response.json().get("url")
|
||||||
|
if not firmware_url:
|
||||||
|
raise ValueError("Firmware URL not found in the server response.")
|
||||||
|
firmware_response = requests.get(firmware_url)
|
||||||
|
firmware_response.raise_for_status()
|
||||||
|
with open(cached_file_location, "wb") as firmware_file:
|
||||||
|
firmware_file.write(firmware_response.content)
|
||||||
|
file_location = cached_file_location
|
||||||
|
else:
|
||||||
|
logger.info("No cached firmware file found. Downloading new version.")
|
||||||
|
response = requests.get("http://firmware.pyasic.org/latest")
|
||||||
|
response.raise_for_status()
|
||||||
|
firmware_url = response.json().get("url")
|
||||||
|
if not firmware_url:
|
||||||
|
raise ValueError("Firmware URL not found in the server response.")
|
||||||
|
firmware_response = requests.get(firmware_url)
|
||||||
|
firmware_response.raise_for_status()
|
||||||
|
with open(cached_file_location, "wb") as firmware_file:
|
||||||
|
firmware_file.write(firmware_response.content)
|
||||||
|
file_location = cached_file_location
|
||||||
|
|
||||||
|
# Upload the firmware file to the BOSMiner device
|
||||||
|
logger.info(f"Uploading firmware file from {file_location} to the device.")
|
||||||
|
await self.send_command(f"scp {file_location} root@{self.ip}:/tmp/firmware.tar.gz")
|
||||||
|
|
||||||
|
# Extract the firmware file
|
||||||
|
logger.info("Extracting the firmware file on the device.")
|
||||||
|
await self.send_command("tar -xzf /tmp/firmware.tar.gz -C /tmp")
|
||||||
|
|
||||||
|
# Run the firmware upgrade script
|
||||||
|
logger.info("Running the firmware upgrade script on the device.")
|
||||||
|
result = await self.send_command("sh /tmp/upgrade_firmware.sh")
|
||||||
|
|
||||||
|
logger.info("Firmware upgrade process completed successfully.")
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"An error occurred during the firmware upgrade process: {e}")
|
||||||
|
raise
|
||||||
24
tests/miners_tests/test_braiins_os.py
Normal file
24
tests/miners_tests/test_braiins_os.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
import pytest
|
||||||
|
from unittest.mock import patch, mock_open
|
||||||
|
from pyasic.ssh.braiins_os import BOSMinerSSH
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def bosminer_ssh():
|
||||||
|
return BOSMinerSSH(ip="192.168.1.100")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_upgrade_firmware_with_valid_file_location(bosminer_ssh):
|
||||||
|
with patch("pyasic.ssh.braiins_os.os.path.exists") as mock_exists, \
|
||||||
|
patch("pyasic.ssh.braiins_os.open", mock_open(read_data="data")) as mock_file, \
|
||||||
|
patch("pyasic.ssh.braiins_os.requests.get") as mock_get, \
|
||||||
|
patch.object(bosminer_ssh, "send_command") as mock_send_command:
|
||||||
|
|
||||||
|
mock_exists.return_value = False
|
||||||
|
file_location = "/path/to/firmware.tar.gz"
|
||||||
|
|
||||||
|
result = await bosminer_ssh.upgrade_firmware(file_location=file_location)
|
||||||
|
|
||||||
|
mock_send_command.assert_any_call(f"scp {file_location} root@{bosminer_ssh.ip}:/tmp/firmware.tar.gz")
|
||||||
|
mock_send_command.assert_any_call("tar -xzf /tmp/firmware.tar.gz -C /tmp")
|
||||||
|
mock_send_command.assert_any_call("sh /tmp/upgrade_firmware.sh")
|
||||||
|
assert result is not None
|
||||||
Reference in New Issue
Block a user