updated btminer API to use cryptography instead of pycryptodome because it's painful to set up, and updated requirements.txt
This commit is contained in:
@@ -56,7 +56,7 @@ class BaseMinerAPI:
|
|||||||
data = None
|
data = None
|
||||||
try:
|
try:
|
||||||
data = await self.send_command(command)
|
data = await self.send_command(command)
|
||||||
except APIError as e:
|
except APIError:
|
||||||
try:
|
try:
|
||||||
data = {}
|
data = {}
|
||||||
# S19 handler, try again
|
# S19 handler, try again
|
||||||
@@ -70,8 +70,6 @@ class BaseMinerAPI:
|
|||||||
if data:
|
if data:
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def send_command(self, command: str, parameters: str or int or bool = None) -> dict:
|
async def send_command(self, command: str, parameters: str or int or bool = None) -> dict:
|
||||||
"""Send an API command to the miner and return the result."""
|
"""Send an API command to the miner and return the result."""
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import re
|
|||||||
import json
|
import json
|
||||||
import hashlib
|
import hashlib
|
||||||
import binascii
|
import binascii
|
||||||
from Crypto.Cipher import AES
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||||
import base64
|
import base64
|
||||||
|
|
||||||
|
|
||||||
@@ -42,13 +42,17 @@ def _add_to_16(s: str) -> bytes:
|
|||||||
|
|
||||||
|
|
||||||
def parse_btminer_priviledge_data(token_data, data):
|
def parse_btminer_priviledge_data(token_data, data):
|
||||||
|
# get the encoded data from the dict
|
||||||
enc_data = data['enc']
|
enc_data = data['enc']
|
||||||
|
# get the aes key from the token data
|
||||||
aeskey = hashlib.sha256(token_data['host_passwd_md5'].encode()).hexdigest()
|
aeskey = hashlib.sha256(token_data['host_passwd_md5'].encode()).hexdigest()
|
||||||
|
# unhexlify the aes key
|
||||||
aeskey = binascii.unhexlify(aeskey.encode())
|
aeskey = binascii.unhexlify(aeskey.encode())
|
||||||
aes = AES.new(aeskey, AES.MODE_ECB)
|
# create the required decryptor
|
||||||
ret_msg = json.loads(str(
|
aes = Cipher(algorithms.AES(aeskey), modes.ECB())
|
||||||
aes.decrypt(base64.decodebytes(bytes(
|
decryptor = aes.decryptor()
|
||||||
enc_data, encoding='utf8'))).rstrip(b'\0').decode("utf8")))
|
# decode the message with the decryptor
|
||||||
|
ret_msg = json.loads(decryptor.update(base64.decodebytes(bytes(enc_data, encoding='utf8'))).rstrip(b'\0').decode("utf8"))
|
||||||
return ret_msg
|
return ret_msg
|
||||||
|
|
||||||
|
|
||||||
@@ -60,13 +64,12 @@ def create_privileged_cmd(token_data: dict, command: dict) -> bytes:
|
|||||||
# unhexlify the encoded host_passwd
|
# unhexlify the encoded host_passwd
|
||||||
aeskey = binascii.unhexlify(aeskey.encode())
|
aeskey = binascii.unhexlify(aeskey.encode())
|
||||||
# create a new AES key
|
# create a new AES key
|
||||||
aes = AES.new(aeskey, AES.MODE_ECB)
|
aes = Cipher(algorithms.AES(aeskey), modes.ECB())
|
||||||
|
encryptor = aes.encryptor()
|
||||||
# dump the command to json
|
# dump the command to json
|
||||||
api_json_str = json.dumps(command)
|
api_json_str = json.dumps(command)
|
||||||
# encode the json command with the aes key
|
# encode the json command with the aes key
|
||||||
api_json_str_enc = str(base64.encodebytes(
|
api_json_str_enc = base64.encodebytes(encryptor.update(_add_to_16(api_json_str))).decode("utf-8").replace("\n", "")
|
||||||
aes.encrypt(_add_to_16(api_json_str))),
|
|
||||||
encoding='utf8').replace('\n', '')
|
|
||||||
# label the data as being encoded
|
# label the data as being encoded
|
||||||
data_enc = {'enc': 1, 'data': api_json_str_enc}
|
data_enc = {'enc': 1, 'data': api_json_str_enc}
|
||||||
# dump the labeled data to json
|
# dump the labeled data to json
|
||||||
|
|||||||
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
Reference in New Issue
Block a user