added btminer.py file for the whatsminer API.

This commit is contained in:
UpstreamData
2021-12-20 09:57:04 -07:00
parent d4306586cf
commit 2fe8df0c36
3 changed files with 351 additions and 2 deletions

View File

@@ -126,6 +126,9 @@ class BaseMinerAPI:
if data[key][0]["STATUS"][0]["STATUS"] not in ["S", "I"]:
# this is an error
return False
elif "id" not in data.keys():
if data["STATUS"] not in ["S", "I"]:
return False
else:
# make sure the command succeeded
if data["STATUS"][0]["STATUS"] not in ("S", "I"):
@@ -145,11 +148,16 @@ class BaseMinerAPI:
else:
# no null byte
str_data = data.decode('utf-8')
# fix an error with a btminer return having an extra comma that breaks json.loads()
str_data = str_data.replace(",}", "}")
# fix an error with a btminer return having a newline that breaks json.loads()
str_data = str_data.replace("\n", "")
# fix an error with a bmminer return not having a specific comma that breaks json.loads()
str_data = str_data.replace("}{", "},{")
# parse the json
parsed_data = json.loads(str_data)
# handle bad json
except json.decoder.JSONDecodeError:
except json.decoder.JSONDecodeError as e:
print(e)
raise APIError(f"Decode Error: {data}")
return parsed_data

327
API/btminer.py Normal file
View File

@@ -0,0 +1,327 @@
from API import BaseMinerAPI, APIError
from passlib.hash import md5_crypt
import asyncio
import re
import json
import hashlib
import binascii
from Crypto.Cipher import AES
import base64
### IMPORTANT ###
# you need to change the password of the miners using
# the whatsminer tool, then you can set them back to
# admin with this tool, but they must be changed to
# something else and set back to admin with this or
# the privileged API will not work using admin as
# the password.
def _crypt(word: str, salt: str) -> str:
# compile a standard format for the salt
standard_salt = re.compile('\s*\$(\d+)\$([\w\./]*)\$')
# check if the salt matches
match = standard_salt.match(salt)
# if the matching fails, the salt is incorrect
if not match:
raise ValueError("salt format is not correct")
# save the matched salt in a new variable
new_salt = match.group(2)
# encrypt the word with the salt using md5
result = md5_crypt.hash(word, salt=new_salt)
return result
def _add_to_16(s: str) -> bytes:
"""Add null bytes to a string until the length is 16"""
while len(s) % 16 != 0:
s += '\0'
return str.encode(s) # return bytes
def parse_btminer_priviledge_data(token_data, data):
enc_data = data['enc']
aeskey = hashlib.sha256(token_data['host_passwd_md5'].encode()).hexdigest()
aeskey = binascii.unhexlify(aeskey.encode())
aes = AES.new(aeskey, AES.MODE_ECB)
ret_msg = json.loads(str(
aes.decrypt(base64.decodebytes(bytes(
enc_data, encoding='utf8'))).rstrip(b'\0').decode("utf8")))
return ret_msg
def create_privileged_cmd(token_data: dict, command: dict) -> bytes:
# add token to command
command['token'] = token_data['host_sign']
# encode host_passwd data and get hexdigest
aeskey = hashlib.sha256(token_data['host_passwd_md5'].encode()).hexdigest()
# unhexlify the encoded host_passwd
aeskey = binascii.unhexlify(aeskey.encode())
# create a new AES key
aes = AES.new(aeskey, AES.MODE_ECB)
# dump the command to json
api_json_str = json.dumps(command)
# encode the json command with the aes key
api_json_str_enc = str(base64.encodebytes(
aes.encrypt(_add_to_16(api_json_str))),
encoding='utf8').replace('\n', '')
# label the data as being encoded
data_enc = {'enc': 1, 'data': api_json_str_enc}
# dump the labeled data to json
api_packet_str = json.dumps(data_enc)
return api_packet_str.encode('utf-8')
class BTMinerAPI(BaseMinerAPI):
def __init__(self, ip, port=4028, pwd: str = "admin"):
super().__init__(ip, port)
self.admin_pwd = pwd
self.current_token = None
async def send_command(self, command: str | bytes, **kwargs) -> dict:
"""Send an API command to the miner and return the result."""
if isinstance(command, str):
command = json.dumps({"command": command}).encode("utf-8")
try:
# get reader and writer streams
reader, writer = await asyncio.open_connection(str(self.ip), self.port)
# handle OSError 121
except OSError as e:
if e.winerror == "121":
print("Semaphore Timeout has Expired.")
return {}
# send the command
writer.write(command)
await writer.drain()
# instantiate data
data = b""
# loop to receive all the data
try:
while True:
d = await reader.read(4096)
if not d:
break
data += d
except Exception as e:
print(e)
data = self.load_api_data(data)
# close the connection
writer.close()
await writer.wait_closed()
if 'enc' in data.keys():
try:
data = parse_btminer_priviledge_data(self.current_token, data)
except Exception as e:
print(e)
if not self.validate_command_output(data):
raise APIError(data["Msg"])
return data
async def get_token(self):
data = await self.send_command("get_token")
pwd = _crypt(self.admin_pwd, "$1$" + data["Msg"]["salt"] + '$')
pwd = pwd.split('$')
host_passwd_md5 = pwd[3]
tmp = _crypt(pwd[3] + data["Msg"]["time"], "$1$" + data["Msg"]["newsalt"] + '$')
tmp = tmp.split('$')
host_sign = tmp[3]
self.current_token = {'host_sign': host_sign, 'host_passwd_md5': host_passwd_md5}
return {'host_sign': host_sign, 'host_passwd_md5': host_passwd_md5}
#### privileged COMMANDS ####
# Please read the top of this file to learn
# how to configure the whatsminer API to
# use these commands.
async def update_pools(self,
pool_1: str, worker_1: str, passwd_1: str,
pool_2: str = None, worker_2: str = None, passwd_2: str = None,
pool_3: str = None, worker_3: str = None, passwd_3: str = None):
token_data = await self.get_token()
if pool_2 and pool_3:
command = {
"cmd": "update_pools",
"pool1": pool_1, "worker1": worker_1, "passwd1": passwd_1,
"pool2": pool_2, "worker2": worker_2, "passwd2": passwd_2,
"pool3": pool_3, "worker3": worker_3, "passwd3": passwd_3,
}
elif pool_2:
command = {
"cmd": "update_pools",
"pool1": pool_1, "worker1": worker_1, "passwd1": passwd_1,
"pool2": pool_2, "worker2": worker_2, "passwd2": passwd_2
}
else:
command = {
"cmd": "update_pools",
"pool1": pool_1, "worker1": worker_1, "passwd1": passwd_1,
}
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def restart_btminer(self):
command = {"cmd": "restart_btminer"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def power_off(self, respbefore: bool = True):
if respbefore:
command = {"cmd": "power_off", "respbefore": "true"}
else:
command = {"cmd": "power_off", "respbefore": "false"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def power_on(self):
command = {"cmd": "power_on"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def reset_led(self):
command = {"cmd": "set_led", "param": "auto"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def set_led(self, color: str = "red", period: int = 2000, duration: int = 1000, start: int = 0):
command = {"cmd": "set_led", "color": color, "period": period, "duration": duration, "start": start}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def set_low_power(self):
command = {"cmd": "set_low_power"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def update_firmware(self):
# to be determined if this will be added later
# requires a file stream in bytes
return NotImplementedError
async def reboot(self):
command = {"cmd": "reboot"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def factory_reset(self):
command = {"cmd": "factory_reset"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def update_pwd(self, old_pwd: str, new_pwd: str):
# check if password length is greater than 8 bytes
if len(new_pwd.encode('utf-8')) > 8:
return APIError(
f"New password too long, the max length is 8. Password size: {len(new_pwd.encode('utf-8'))}")
command = {"cmd": "update_pwd", "old": old_pwd, "new": new_pwd}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def set_target_freq(self, percent: int):
if not -10 < percent < 100:
return APIError(f"Frequency % is outside of the allowed range. Please set a % between -10 and 100")
command = {"cmd": "set_target_freq", "percent": str(percent)}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def enable_fast_boot(self):
command = {"cmd": "enable_btminer_fast_boot"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def disable_fast_boot(self):
command = {"cmd": "disable_btminer_fast_boot"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def enable_web_pools(self):
command = {"cmd": "enable_web_pools"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def disable_web_pools(self):
command = {"cmd": "disable_web_pools"}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def set_hostname(self, hostname: str):
command = {"cmd": "set_hostname", "hostname": hostname}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def set_power_pct(self, percent: int):
if not 0 < percent < 100:
return APIError(f"Power PCT % is outside of the allowed range. Please set a % between 0 and 100")
command = {"cmd": "set_power_pct", "percent": str(percent)}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
async def pre_power_on(self, complete: bool, msg: str):
if not msg == "wait for adjust temp" or "adjust complete" or "adjust continue":
return APIError(
'Message is incorrect, please choose one of '
'["wait for adjust temp", "adjust complete", "adjust continue"]'
)
if complete:
complete = "true"
else:
complete = "false"
command = {"cmd": "pre_power_on", "complete": complete, "msg": msg}
token_data = await self.get_token()
enc_command = create_privileged_cmd(token_data, command)
return await self.send_command(enc_command)
#### END privileged COMMANDS ####
async def summary(self):
return await self.send_command("summary")
async def pools(self):
return await self.send_command("pools")
async def devs(self):
return await self.send_command("devs")
async def edevs(self):
return await self.send_command("edevs")
async def devdetails(self):
return await self.send_command("devdetails")
async def get_psu(self):
return await self.send_command("get_psu")
async def version(self):
return await self.send_command("get_version")
async def status(self):
return await self.send_command("status")
async def get_miner_info(self):
return await self.send_command("get_miner_info")