tests: add tests for config and update tests.

This commit is contained in:
UpstreamData
2023-12-18 14:00:40 -07:00
parent b045abe76e
commit eb9b29aca1
4 changed files with 326 additions and 172 deletions

View File

@@ -16,7 +16,7 @@
import unittest import unittest
# from tests.miners_tests import MinerFactoryTest, MinersTest from tests.api_tests import *
from tests.network_tests import NetworkTest from tests.network_tests import NetworkTest
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -13,198 +13,136 @@
# See the License for the specific language governing permissions and - # See the License for the specific language governing permissions and -
# limitations under the License. - # limitations under the License. -
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
import asyncio import json
import time
import unittest import unittest
from unittest.mock import MagicMock, Mock, patch from unittest.mock import patch
from pyasic import APIError
from pyasic.API.bfgminer import BFGMinerAPI
from pyasic.API.bmminer import BMMinerAPI from pyasic.API.bmminer import BMMinerAPI
from pyasic.API.bosminer import BOSMinerAPI from pyasic.API.bosminer import BOSMinerAPI
from pyasic.API.btminer import BTMinerAPI from pyasic.API.btminer import BTMinerAPI
from pyasic.API.cgminer import CGMinerAPI from pyasic.API.cgminer import CGMinerAPI
from pyasic.API.luxminer import LUXMinerAPI
class TestBMMinerAPI(unittest.IsolatedAsyncioTestCase): class TestAPIBase(unittest.IsolatedAsyncioTestCase):
async def create_mock_connection(self, reader_data: bytes = b""):
mock_reader = Mock(asyncio.StreamReader)
mock_reader.read.side_effect = [reader_data, None]
mock_writer = Mock(asyncio.StreamWriter)
mock_writer.write.return_value = None
mock_writer.drain.return_value = None
mock_writer.get_extra_info.return_value = (self.ip, self.port)
mock_connection = MagicMock()
mock_connection.__aenter__.return_value = (mock_reader, mock_writer)
mock_connection.__aexit__.return_value = None
return mock_connection
def setUp(self): def setUp(self):
self.ip = "192.168.0.1" self.ip = "10.0.0.50"
self.port = 4028 self.port = 4028
self.api_str = ""
self.api = None
self.setUpData()
def setUpData(self):
pass
def get_error_value(self):
return json.dumps(
{
"STATUS": [
{"STATUS": "E", "When": time.time(), "Code": 7, "Msg": self.api_str}
],
"id": 1,
}
).encode("utf-8")
def get_success_value(self, command: str):
return json.dumps(
{
"STATUS": [
{
"STATUS": "S",
"When": time.time(),
"Code": 69,
"Msg": f"{self.api_str} {command}",
}
],
command.upper(): [{command: "test"}],
"id": 1,
}
).encode("utf-8")
@patch("pyasic.API.BaseMinerAPI._send_bytes") @patch("pyasic.API.BaseMinerAPI._send_bytes")
async def test_send_command(self, mock_send_bytes): async def test_command_error_raises_api_error(self, mock_send_bytes):
miner_api = BMMinerAPI(ip=self.ip, port=self.port) if self.api is None:
mock_send_bytes.return_value = b'{"STATUS":[{"STATUS":"S","When":1618486231,"Code":7,"Msg":"BMMiner"}], "SUMMARY":[{}], "id": 1}' return
response = await miner_api.send_command("summary")
self.assertIsInstance(response, dict) mock_send_bytes.return_value = self.get_error_value()
self.assertEqual(response["STATUS"][0]["STATUS"], "S") with self.assertRaises(APIError):
await self.api.send_command("summary")
@patch("pyasic.API.BaseMinerAPI._send_bytes") @patch("pyasic.API.BaseMinerAPI._send_bytes")
async def test_all_commands(self, mock_send_bytes): async def test_command_error_ignored_by_flag(self, mock_send_bytes):
miner_api = BMMinerAPI(ip=self.ip, port=self.port) if self.api is None:
for command in miner_api.commands: return
mock_send_bytes.return_value = (
b'{"STATUS":[{"STATUS":"S","When":1618486231,"Code":7,"Msg":"BMMiner"}], "' mock_send_bytes.return_value = self.get_error_value()
+ command.upper().encode("utf-8") try:
+ b'":[{}], "id": 1}' await self.api.send_command(
"summary", ignore_errors=True, allow_warning=False
) )
try: except APIError:
command_func = getattr(miner_api, command) self.fail(
except AttributeError: f"Expected ignore_errors flag to ignore error in {self.api_str} API"
pass
else:
try:
response = await command_func()
except TypeError:
# probably addpool or something
pass
else:
self.assertIsInstance(response, dict)
self.assertEqual(response["STATUS"][0]["STATUS"], "S")
async def test_init(self):
miner_api = BMMinerAPI(ip=self.ip, port=self.port)
self.assertEqual(str(miner_api.ip), self.ip)
self.assertEqual(miner_api.port, self.port)
self.assertEqual(str(miner_api), "BMMinerAPI: 192.168.0.1")
class TestBTMinerAPI(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.ip = "192.168.0.1"
self.port = 4028
@patch("pyasic.API.BaseMinerAPI._send_bytes")
async def test_send_command(self, mock_send_bytes):
mock_send_bytes.return_value = b'{"STATUS":[{"STATUS":"S","When":1618486231,"Code":7,"Msg":"BTMiner"}], "SUMMARY":[{"sumamry": 1}], "id": 1}'
miner_api = BTMinerAPI(ip=self.ip, port=self.port)
response = await miner_api.send_command("summary")
self.assertIsInstance(response, dict)
self.assertEqual(response["STATUS"][0]["STATUS"], "S")
@patch("pyasic.API.BaseMinerAPI._send_bytes")
async def test_all_commands(self, mock_send_bytes):
miner_api = BMMinerAPI(ip=self.ip, port=self.port)
for command in miner_api.commands:
mock_send_bytes.return_value = (
b'{"STATUS":[{"STATUS":"S","When":1618486231,"Code":7,"Msg":"BTMiner"}], "'
+ command.upper().encode("utf-8")
+ b'":[{}], "id": 1}'
) )
try:
command_func = getattr(miner_api, command) @patch("pyasic.API.BaseMinerAPI._send_bytes")
except AttributeError: async def test_all_read_command_success(self, mock_send_bytes):
pass if self.api is None:
else: return
commands = self.api.commands
for command in commands:
with self.subTest(msg=f"{self.api_str} {command}"):
api_func = getattr(self.api, command)
mock_send_bytes.return_value = self.get_success_value(command)
try: try:
response = await command_func() await api_func()
except APIError:
self.fail(f"Expected successful return from API function {command}")
except TypeError: except TypeError:
# probably addpool or something continue
pass except KeyError:
else: continue
self.assertIsInstance(response, dict)
self.assertEqual(response["STATUS"][0]["STATUS"], "S")
async def test_init(self):
miner_api = BTMinerAPI(ip=self.ip, port=self.port)
self.assertEqual(str(miner_api.ip), self.ip)
self.assertEqual(miner_api.port, self.port)
self.assertEqual(str(miner_api), "BTMinerAPI: 192.168.0.1")
class TestCGMinerAPI(unittest.IsolatedAsyncioTestCase): class TestBFGMinerAPI(TestAPIBase):
def setUp(self): def setUpData(self):
self.ip = "192.168.0.1" self.api = BFGMinerAPI(self.ip)
self.port = 4028 self.api_str = "BFGMiner"
@patch("pyasic.API.BaseMinerAPI._send_bytes")
async def test_send_command(self, mock_send_bytes):
mock_send_bytes.return_value = b'{"STATUS":[{"STATUS":"S","When":1618486231,"Code":7,"Msg":"CGMiner"}], "SUMMARY":[{"sumamry": 1}], "id": 1}'
miner_api = CGMinerAPI(ip=self.ip, port=self.port)
response = await miner_api.send_command("summary")
self.assertIsInstance(response, dict)
self.assertEqual(response["STATUS"][0]["STATUS"], "S")
@patch("pyasic.API.BaseMinerAPI._send_bytes")
async def test_all_commands(self, mock_send_bytes):
miner_api = BMMinerAPI(ip=self.ip, port=self.port)
for command in miner_api.commands:
mock_send_bytes.return_value = (
b'{"STATUS":[{"STATUS":"S","When":1618486231,"Code":7,"Msg":"CGMiner"}], "'
+ command.upper().encode("utf-8")
+ b'":[{}], "id": 1}'
)
try:
command_func = getattr(miner_api, command)
except AttributeError:
pass
else:
try:
response = await command_func()
except TypeError:
# probably addpool or something
pass
else:
self.assertIsInstance(response, dict)
self.assertEqual(response["STATUS"][0]["STATUS"], "S")
async def test_init(self):
miner_api = CGMinerAPI(ip=self.ip, port=self.port)
self.assertEqual(str(miner_api.ip), self.ip)
self.assertEqual(miner_api.port, self.port)
self.assertEqual(str(miner_api), "CGMinerAPI: 192.168.0.1")
class TestBOSMinerAPI(unittest.IsolatedAsyncioTestCase): class TestBMMinerAPI(TestAPIBase):
def setUp(self): def setUpData(self):
self.ip = "192.168.0.1" self.api = BMMinerAPI(self.ip)
self.port = 4028 self.api_str = "BMMiner"
@patch("pyasic.API.BaseMinerAPI._send_bytes")
async def test_send_command(self, mock_send_bytes):
mock_send_bytes.return_value = b'{"STATUS":[{"STATUS":"S","When":1618486231,"Code":7,"Msg":"BOSMiner"}], "SUMMARY":[{"sumamry": 1}], "id": 1}'
miner_api = BOSMinerAPI(ip=self.ip, port=self.port)
response = await miner_api.send_command("summary")
self.assertIsInstance(response, dict)
self.assertEqual(response["STATUS"][0]["STATUS"], "S")
@patch("pyasic.API.BaseMinerAPI._send_bytes") class TestBOSMinerAPI(TestAPIBase):
async def test_all_commands(self, mock_send_bytes): def setUpData(self):
miner_api = BMMinerAPI(ip=self.ip, port=self.port) self.api = BOSMinerAPI(self.ip)
for command in miner_api.commands: self.api_str = "BOSMiner"
mock_send_bytes.return_value = (
b'{"STATUS":[{"STATUS":"S","When":1618486231,"Code":7,"Msg":"BOSMiner"}], "'
+ command.upper().encode("utf-8")
+ b'":[{}], "id": 1}'
)
try:
command_func = getattr(miner_api, command)
except AttributeError:
pass
else:
try:
response = await command_func()
except TypeError:
# probably addpool or something
pass
else:
self.assertIsInstance(response, dict)
self.assertEqual(response["STATUS"][0]["STATUS"], "S")
async def test_init(self):
miner_api = BOSMinerAPI(ip=self.ip, port=self.port) class TestBTMinerAPI(TestAPIBase):
self.assertEqual(str(miner_api.ip), self.ip) def setUpData(self):
self.assertEqual(miner_api.port, self.port) self.api = BTMinerAPI(self.ip)
self.assertEqual(str(miner_api), "BOSMinerAPI: 192.168.0.1") self.api_str = "BTMiner"
class TestCGMinerAPI(TestAPIBase):
def setUpData(self):
self.api = CGMinerAPI(self.ip)
self.api_str = "CGMiner"
class TestLuxOSAPI(TestAPIBase):
def setUpData(self):
self.api = LUXMinerAPI(self.ip)
self.api_str = "LuxOS"
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,216 @@
# ------------------------------------------------------------------------------
# Copyright 2022 Upstream Data Inc -
# -
# Licensed under the Apache License, Version 2.0 (the "License"); -
# you may not use this file except in compliance with the License. -
# You may obtain a copy of the License at -
# -
# http://www.apache.org/licenses/LICENSE-2.0 -
# -
# Unless required by applicable law or agreed to in writing, software -
# distributed under the License is distributed on an "AS IS" BASIS, -
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
# See the License for the specific language governing permissions and -
# limitations under the License. -
# ------------------------------------------------------------------------------
import unittest
from pyasic.config import (
FanModeConfig,
MinerConfig,
MiningModeConfig,
PoolConfig,
PowerScalingConfig,
PowerScalingShutdown,
TemperatureConfig,
)
class TestConfig(unittest.TestCase):
def setUp(self):
self.cfg = MinerConfig(
pools=PoolConfig.simple(
[
{
"url": "stratum+tcp://stratum.test.io:3333",
"user": "test.test",
"password": "123",
}
]
),
fan_mode=FanModeConfig.manual(speed=90, minimum_fans=2),
temperature=TemperatureConfig(target=70, danger=120),
mining_mode=MiningModeConfig.power_tuning(power=3000),
power_scaling=PowerScalingConfig.enabled(
power_step=100,
minimum_power=2000,
shutdown_enabled=PowerScalingShutdown.enabled(duration=3),
),
)
def test_dict_deserialize_and_serialize(self):
dict_config = self.cfg.as_dict()
loaded_cfg = MinerConfig.from_dict(dict_config)
self.assertEqual(loaded_cfg, self.cfg)
def test_dict_serialize_and_deserialize(self):
dict_config = {
"pools": {
"groups": [
{
"pools": [
{
"url": "stratum+tcp://stratum.test.io:3333",
"user": "test.test",
"password": "123",
}
],
"quota": 1,
"name": "B6SPD2",
}
]
},
"fan_mode": {"mode": "manual", "speed": 90, "minimum_fans": 2},
"temperature": {"target": 70, "hot": None, "danger": 120},
"mining_mode": {"mode": "power_tuning", "power": 3000},
"power_scaling": {
"mode": "enabled",
"power_step": 100,
"minimum_power": 2000,
"shutdown_enabled": {"mode": "enabled", "duration": 3},
},
}
loaded_config = MinerConfig.from_dict(dict_config)
dumped_config = loaded_config.as_dict()
self.assertEqual(dumped_config, dict_config)
def test_bosminer_deserialize_and_serialize(self):
bosminer_config = self.cfg.as_bosminer()
loaded_config = MinerConfig.from_bosminer(bosminer_config)
self.assertEqual(loaded_config, self.cfg)
def test_bosminer_serialize_and_deserialize(self):
bosminer_config = {
"temp_control": {
"mode": "manual",
"target_temp": 70,
"dangerous_temp": 120,
},
"fan_control": {"min_fans": 2, "speed": 90},
"autotuning": {"enabled": True, "psu_power_limit": 3000},
"group": [
{
"name": "W91Q1L",
"pool": [
{
"url": "statum+tcp://stratum.test.io:3333",
"user": "test.test",
"password": "123",
}
],
"quota": 1,
}
],
"power_scaling": {
"enabled": True,
"power_step": 100,
"min_psu_power_limit": 2000,
"shutdown_enabled": True,
"shutdown_duration": 3,
},
}
loaded_config = MinerConfig.from_bosminer(bosminer_config)
dumped_config = loaded_config.as_bosminer()
self.assertEqual(dumped_config, bosminer_config)
def test_am_modern_serialize(self):
correct_config = {
"bitmain-fan-ctrl": True,
"bitmain-fan-pwn": "90",
"freq-level": "100",
"miner-mode": "0",
"pools": [
{
"url": "stratum+tcp://stratum.test.io:3333",
"user": "test.test",
"pass": "123",
},
{"url": "", "user": "", "pass": ""},
{"url": "", "user": "", "pass": ""},
],
}
self.assertEqual(correct_config, self.cfg.as_am_modern())
def test_am_old_serialize(self):
correct_config = {
"_ant_pool1url": "stratum+tcp://stratum.test.io:3333",
"_ant_pool1user": "test.test",
"_ant_pool1pw": "123",
"_ant_pool2url": "",
"_ant_pool2user": "",
"_ant_pool2pw": "",
"_ant_pool3url": "",
"_ant_pool3user": "",
"_ant_pool3pw": "",
}
self.assertEqual(correct_config, self.cfg.as_am_old())
def test_wm_serialize(self):
correct_config = {
"mode": "power_tuning",
"power_tuning": {"wattage": 3000},
"pools": {
"pool_1": "stratum+tcp://stratum.test.io:3333",
"worker_1": "test.test",
"passwd_1": "123",
"pool_2": "",
"worker_2": "",
"passwd_2": "",
"pool_3": "",
"worker_3": "",
"passwd_3": "",
},
}
self.assertEqual(correct_config, self.cfg.as_wm())
def test_goldshell_serialize(self):
correct_config = {
"pools": [
{
"url": "stratum+tcp://stratum.test.io:3333",
"user": "test.test",
"pass": "123",
}
]
}
self.assertEqual(correct_config, self.cfg.as_goldshell())
def test_avalon_serialize(self):
correct_config = {"pools": "stratum+tcp://stratum.test.io:3333,test.test,123"}
self.assertEqual(correct_config, self.cfg.as_avalon())
def test_inno_serialize(self):
correct_config = {
"Pool1": "stratum+tcp://stratum.test.io:3333",
"UserName1": "test.test",
"Password1": "123",
"Pool2": "",
"UserName2": "",
"Password2": "",
"Pool3": "",
"UserName3": "",
"Password3": "",
}
self.assertEqual(correct_config, self.cfg.as_inno())
if __name__ == "__main__":
unittest.main()

View File

@@ -44,8 +44,8 @@ class NetworkTest(unittest.TestCase):
ipaddress.IPv4Address("192.168.1.60"), ipaddress.IPv4Address("192.168.1.60"),
] ]
self.assertTrue(net_1 == correct_net) self.assertEqual(net_1, correct_net)
self.assertTrue(net_2 == correct_net) self.assertEqual(net_2, correct_net)
def test_net(self): def test_net(self):
net_1_str = "192.168.1.0" net_1_str = "192.168.1.0"
@@ -64,8 +64,8 @@ class NetworkTest(unittest.TestCase):
ipaddress.IPv4Address("192.168.1.6"), ipaddress.IPv4Address("192.168.1.6"),
] ]
self.assertTrue(net_1 == correct_net) self.assertEqual(net_1, correct_net)
self.assertTrue(net_2 == correct_net) self.assertEqual(net_2, correct_net)
def test_net_defaults(self): def test_net_defaults(self):
net = MinerNetwork.from_subnet("192.168.1.1/24") net = MinerNetwork.from_subnet("192.168.1.1/24")