added basic network functionality

This commit is contained in:
UpstreamData
2021-10-07 14:33:53 -06:00
parent e3bc84455d
commit 5f835021d1
4 changed files with 85 additions and 17 deletions

View File

@@ -1,5 +1,6 @@
import asyncio
import json
import ipaddress
class APIError(Exception):
@@ -17,17 +18,17 @@ class APIError(Exception):
class BaseMinerAPI:
def __init__(self, ip, port):
def __init__(self, ip: str, port: int):
self.port = port
self.ip = ip
self.ip = ipaddress.ip_address(ip)
async def multicommand(self, *commands: str) -> dict:
command = "+".join(commands)
return await self.send_command(command)
async def send_command(self, command, parameters: dict = None) -> dict:
async def send_command(self, command: str, parameters: str = None) -> dict:
# get reader and writer streams
reader, writer = await asyncio.open_connection(self.ip, self.port)
reader, writer = await asyncio.open_connection(str(self.ip), self.port)
# create the command
cmd = {"command": command}

View File

@@ -48,19 +48,29 @@ class BOSMinerAPI(BaseMinerAPI):
return await self.send_command("lcd")
async def switchpool(self, n: int) -> dict:
return await self.send_command("switchpool", parameters=n)
# BOS has not implemented this yet, they will in the future
return NotImplementedError
# return await self.send_command("switchpool", parameters=n)
async def enablepool(self, n: int) -> dict:
return await self.send_command("enablepool", parameters=n)
# BOS has not implemented this yet, they will in the future
return NotImplementedError
# return await self.send_command("enablepool", parameters=n)
async def disablepool(self, n: int) -> dict:
return await self.send_command("disablepool", parameters=n)
# BOS has not implemented this yet, they will in the future
return NotImplementedError
# return await self.send_command("disablepool", parameters=n)
async def addpool(self, url: str, username: str, password: str) -> dict:
return await self.send_command("addpool", parameters=f"{url}, {username}, {password}")
# BOS has not implemented this yet, they will in the future
return NotImplementedError
# return await self.send_command("addpool", parameters=f"{url}, {username}, {password}")
async def removepool(self, n: int) -> dict:
return await self.send_command("removepool", parameters=n)
# BOS has not implemented this yet, they will in the future
return NotImplementedError
# return await self.send_command("removepool", parameters=n)
async def fans(self) -> dict:
return await self.send_command("fans")
@@ -78,4 +88,4 @@ class BOSMinerAPI(BaseMinerAPI):
return await self.send_command("pause")
async def resume(self) -> dict:
return await self.send_command("resume")
return await self.send_command("resume")