remove board util

This commit is contained in:
UpstreamData
2022-05-13 15:35:11 -06:00
parent 9c2de26182
commit 2bf059df01
12 changed files with 21 additions and 65 deletions

View File

@@ -0,0 +1,29 @@
from tools.bad_board_util_old.ui import ui
import asyncio
import sys
import logging
from logger import logger
logger.info("Initializing logger for Board Util.")
# Fix bug with some whatsminers and asyncio because of a socket not being shut down:
if (
sys.version_info[0] == 3
and sys.version_info[1] >= 8
and sys.platform.startswith("win")
):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
def main():
logging.info("Starting Board Util.")
loop = asyncio.new_event_loop()
loop.run_until_complete(ui())
logging.info("Closing Board Util.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,29 @@
from tools.bad_board_util_old.layout import window
def disable_buttons(func):
button_list = [
"scan",
"import_iplist",
"export_iplist",
"select_all_ips",
"refresh_data",
"open_in_web",
"save_report_button",
"light",
]
# handle the inner function that the decorator is wrapping
async def inner(*args, **kwargs):
# disable the buttons
for button in button_list:
window[button].Update(disabled=True)
# call the original wrapped function
await func(*args, **kwargs)
# re-enable the buttons after the wrapped function completes
for button in button_list:
window[button].Update(disabled=False)
return inner

View File

@@ -0,0 +1,104 @@
import ipaddress
import os
import re
import xlsxwriter
import aiofiles
from tools.bad_board_util_old.func.ui import update_ui_with_data
from tools.bad_board_util_old.layout import window
from tools.bad_board_util_old.func.decorators import disable_buttons
from miners.miner_factory import MinerFactory
@disable_buttons
async def save_report(file_location):
data = {}
workbook = xlsxwriter.Workbook(file_location)
sheet = workbook.add_worksheet()
for line in window["ip_table"].Values:
data[line[0]] = {
"Model": line[1],
"Total Chips": line[2],
"Left Chips": line[3],
"Center Chips": line[5],
"Right Chips": line[7],
"Nominal": 1,
}
async for miner in MinerFactory().get_miner_generator([key for key in data.keys()]):
if miner:
data[miner.ip]["Nominal"] = miner.nominal
list_data = []
for ip in data.keys():
new_data = data[ip]
new_data["IP"] = ip
list_data.append(new_data)
data = sorted(data, reverse=True, key=lambda x: x["Total Chips"])
headers = [
"IP",
"Miner Model",
"Total Chip Count",
"Left Board Chips",
"Center Board Chips",
"Right Board Chips",
]
print(data)
row = 0
col = 0
for item in headers:
sheet.write(row, col, item)
col += 1
row = 1
for line in data:
col = 0
for point in line:
sheet.write(row, col, point)
col += 1
row += 1
workbook.close()
async def import_iplist(file_location):
await update_ui_with_data("status", "Importing")
if not os.path.exists(file_location):
return
else:
ip_list = []
async with aiofiles.open(file_location, mode="r") as file:
async for line in file:
ips = [
x.group()
for x in re.finditer(
"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)",
line,
)
]
for ip in ips:
if ip not in ip_list:
ip_list.append(ipaddress.ip_address(ip))
ip_list.sort()
window["ip_table"].update([[str(ip), "", "", "", ""] for ip in ip_list])
await update_ui_with_data("ip_count", str(len(ip_list)))
await update_ui_with_data("status", "")
async def export_iplist(file_location, ip_list_selected):
await update_ui_with_data("status", "Exporting")
if not os.path.exists(file_location):
return
else:
if ip_list_selected is not None and not ip_list_selected == []:
async with aiofiles.open(file_location, mode="w") as file:
for item in ip_list_selected:
await file.write(str(item) + "\n")
else:
async with aiofiles.open(file_location, mode="w") as file:
for item in window["ip_table"].Values:
await file.write(str(item[0]) + "\n")
await update_ui_with_data("status", "")

View File

@@ -0,0 +1,308 @@
import asyncio
import ipaddress
import warnings
from tools.bad_board_util_old.func.ui import (
update_ui_with_data,
update_prog_bar,
set_progress_bar_len,
)
from tools.bad_board_util_old.layout import window
from miners.miner_factory import MinerFactory
from tools.bad_board_util_old.func.decorators import disable_buttons
@disable_buttons
async def miner_light(ips: list):
await asyncio.gather(*[flip_light(ip) for ip in ips])
async def flip_light(ip):
ip_list = window["ip_table"].Widget
miner = await MinerFactory().get_miner(ip)
index = [item[0] for item in window["ip_table"].Values].index(ip)
index_tags = ip_list.item(index + 1)["tags"]
if "light" not in index_tags and "light+bad" not in index_tags:
tag = "light"
if "bad" in index_tags:
index_tags.remove("bad")
tag = "light+bad"
index_tags.append(tag)
ip_list.item(index + 1, tags=index_tags)
await miner.fault_light_on()
else:
if "light+bad" in index_tags:
index_tags.remove("light+bad")
index_tags.append("bad")
if "light" in index_tags:
index_tags.remove("light")
ip_list.item(index + 1, tags=index_tags)
await miner.fault_light_off()
@disable_buttons
async def scan_network(network):
await update_ui_with_data("status", "Scanning")
await update_ui_with_data("ip_count", "")
window["ip_table"].update([])
network_size = len(network)
miner_generator = network.scan_network_generator()
await set_progress_bar_len(2 * network_size)
progress_bar_len = 0
asyncio.create_task(update_prog_bar(progress_bar_len))
miners = []
async for miner in miner_generator:
if miner:
miners.append(miner)
progress_bar_len += 1
asyncio.create_task(update_prog_bar(progress_bar_len))
progress_bar_len += network_size - len(miners)
asyncio.create_task(update_prog_bar(progress_bar_len))
get_miner_genenerator = MinerFactory().get_miner_generator(miners)
all_miners = []
async for found_miner in get_miner_genenerator:
all_miners.append(found_miner)
all_miners.sort(key=lambda x: x.ip)
window["ip_table"].update([[str(miner.ip)] for miner in all_miners])
progress_bar_len += 1
asyncio.create_task(update_prog_bar(progress_bar_len))
await update_ui_with_data("ip_count", str(len(all_miners)))
await update_ui_with_data("status", "")
@disable_buttons
async def refresh_data(ip_list: list):
await update_ui_with_data("status", "Getting Data")
ips = [ipaddress.ip_address(ip) for ip in ip_list]
if len(ips) == 0:
ips = [
ipaddress.ip_address(ip)
for ip in [item[0] for item in window["ip_table"].Values]
]
await set_progress_bar_len(len(ips))
progress_bar_len = 0
asyncio.create_task(update_prog_bar(progress_bar_len))
reset_table_values = []
for item in window["ip_table"].Values:
if item[0] in ip_list:
reset_table_values.append([item[0]])
else:
reset_table_values.append(item)
window["ip_table"].update(reset_table_values)
progress_bar_len = 0
data_gen = asyncio.as_completed([get_formatted_data(miner) for miner in ips])
ip_table_data = window["ip_table"].Values
ordered_all_ips = [item[0] for item in ip_table_data]
row_colors = []
for all_data in data_gen:
data_point = await all_data
if data_point["IP"] in ordered_all_ips:
ip_table_index = ordered_all_ips.index(data_point["IP"])
board_left = ""
board_center = ""
board_right = ""
if data_point["data"]:
if 0 in data_point["data"].keys():
board_left = " ".join(
[chain["chip_status"] for chain in data_point["data"][0]]
).replace("o", "")
if 1 in data_point["data"].keys():
board_center = " ".join(
[chain["chip_status"] for chain in data_point["data"][1]]
).replace("o", "")
if 2 in data_point["data"].keys():
board_right = " ".join(
[chain["chip_status"] for chain in data_point["data"][2]]
).replace("o", "")
data = [
data_point["IP"],
data_point["model"],
len(board_left),
board_left,
len(board_center),
board_center,
len(board_right),
board_right,
]
ip_table_data[ip_table_index] = data
window["ip_table"].update(ip_table_data)
progress_bar_len += 1
asyncio.create_task(update_prog_bar(progress_bar_len))
await update_ui_with_data("status", "")
@disable_buttons
async def scan_and_get_data(network):
# update status and reset the table
await update_ui_with_data("status", "Scanning")
await update_ui_with_data("ip_count", "")
await update_ui_with_data("ip_table", [])
# set progress bar length to network size
network_size = len(network)
await set_progress_bar_len(3 * network_size)
progress_bar_len = 0
miners = []
# scan the network for miners using a generator
async for miner in network.scan_network_generator():
# the generator will either return None or an IP address
if miner:
miners.append(miner)
# add to the progress bar length after scanning an address
progress_bar_len += 1
asyncio.create_task(update_prog_bar(progress_bar_len))
# add progress for the miners that we aren't going to identify
progress_bar_len += network_size - len(miners)
asyncio.create_task(update_prog_bar(progress_bar_len))
all_miners = []
# identify different miner instances using the miner factory generator
async for found_miner in MinerFactory().get_miner_generator(miners):
# miner factory generator will always return a miner
all_miners.append(found_miner)
# sort the list of miners by IP address
all_miners.sort(key=lambda x: x.ip)
# add the new miner to the table
window["ip_table"].update([[str(miner.ip)] for miner in all_miners])
# update progress bar
progress_bar_len += 1
asyncio.create_task(update_prog_bar(progress_bar_len))
# update the count of found miners
await update_ui_with_data("ip_count", str(len(all_miners)))
# update progress bar for miners we wont get data for
progress_bar_len += network_size - len(miners)
asyncio.create_task(update_prog_bar(progress_bar_len))
# get the list of IP addresses from the table
ip_table_data = window["ip_table"].Values
ordered_all_ips = [item[0] for item in ip_table_data]
await update_ui_with_data("status", "Getting Data")
row_colors = []
# create an in place generator for getting data
for all_data in asyncio.as_completed(
[get_formatted_data(miner) for miner in miners]
):
# wait for a generator item to return
data_point = await all_data
# make sure the IP is one we have
# this will likely never fail, but a good failsafe
if data_point["IP"] in ordered_all_ips:
# get the index of the IP in the table
ip_table_index = ordered_all_ips.index(data_point["IP"])
board_left = ""
board_center = ""
board_right = ""
# make sure we have data, some miners don't allow getting board data
if data_point["data"]:
# check if the 0th board (L board) is in the data
if 0 in data_point["data"].keys():
board_left = " ".join(
[chain["chip_status"] for chain in data_point["data"][0]]
).replace("o", "")
else:
# if the board isn't in data, highlight it red
row_colors.append((ip_table_index, "bad"))
# check if the 1st board (C board) is in the data
if 1 in data_point["data"].keys():
board_center = " ".join(
[chain["chip_status"] for chain in data_point["data"][1]]
).replace("o", "")
else:
# if the board isn't in data, highlight it red
row_colors.append((ip_table_index, "bad"))
# check if the 2nd board (R board) is in the data
if 2 in data_point["data"].keys():
board_right = " ".join(
[chain["chip_status"] for chain in data_point["data"][2]]
).replace("o", "")
else:
# if the board isn't in data, highlight it red
row_colors.append((ip_table_index, "bad"))
# check if the miner has all nominal chips
if False in [
# True/False if the miner is nominal
chain["nominal"]
# for each board in the miner
for board in [
data_point["data"][key] for key in data_point["data"].keys()
]
# for each chain in each board in the miner
for chain in board
]:
# if the miner doesn't have all chips, highlight it red
row_colors.append((ip_table_index, "bad"))
else:
# the row is bad if we have no data
row_colors.append((ip_table_index, "bad"))
# split the chip data into thirds
board_left_chips = "\n".join(split_chips(board_left, 3))
board_center_chips = "\n".join(split_chips(board_center, 3))
board_right_chips = "\n".join(split_chips(board_right, 3))
# create data for the table
data = [
data_point["IP"],
data_point["model"],
(len(board_left) + len(board_center) + len(board_right)),
len(board_left),
board_left_chips,
len(board_center),
board_center_chips,
len(board_right),
board_right_chips,
]
# put the data at the index of the IP address
ip_table_data[ip_table_index] = data
window["ip_table"].update(ip_table_data)
# configure "bad" tag to highlight red
table = window["ip_table"].Widget
# set tags on the row if they have been set
for row in row_colors:
table.item(row[0] + 1, tags=row[1])
# add to the progress bar
progress_bar_len += 1
asyncio.create_task(update_prog_bar(progress_bar_len))
# reset status
await update_ui_with_data("status", "")
def split_chips(string, number_of_splits):
k, m = divmod(len(string), number_of_splits)
return (
string[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)]
for i in range(number_of_splits)
)
async def get_formatted_data(ip: ipaddress.ip_address):
miner = await MinerFactory().get_miner(ip)
model = await miner.get_model()
warnings.filterwarnings("ignore")
board_data = await miner.get_board_info()
data = {"IP": str(ip), "model": str(model), "data": board_data}
return data

View File

@@ -0,0 +1,392 @@
import datetime
from base64 import b64decode
from io import BytesIO
from reportlab.lib.pagesizes import letter, inch
from reportlab.lib.utils import ImageReader
from reportlab.lib.styles import ParagraphStyle, TA_CENTER
from reportlab.platypus import (
SimpleDocTemplate,
KeepInFrame,
Table,
Image,
Paragraph,
TableStyle,
PageBreak,
Spacer,
)
from reportlab.lib import colors
import ipaddress
import numpy as np
import matplotlib.dates
import matplotlib.pyplot as plt
from svglib.svglib import svg2rlg
from matplotlib import cm
from matplotlib.ticker import FormatStrFormatter
from miners.miner_factory import MinerFactory
from tools.bad_board_util_old.func.decorators import disable_buttons
from tools.bad_board_util_old.img import IMAGE_SELECTION_MATRIX, LOGO
from tools.bad_board_util_old.layout import window
IP_STYLE = ParagraphStyle(
"IP Style",
alignment=TA_CENTER,
fontSize=7,
fontName="Helvetica-Bold",
)
TITLE_STYLE = ParagraphStyle(
"Title",
alignment=TA_CENTER,
fontSize=20,
spaceAfter=40,
fontName="Helvetica-Bold",
)
def add_first_page_number(canvas, doc):
canvas.saveState()
canvas.drawString(letter[0] - 60, 20, "Page " + str(doc.page))
canvas.restoreState()
def add_page_header(canvas, doc):
canvas.saveState()
canvas.drawCentredString(
(letter[0] / 16) * 14,
letter[1] - 57,
datetime.datetime.now().strftime("%Y-%b-%d"),
)
img_dec = b64decode(LOGO)
img = BytesIO(img_dec)
img.seek(0)
canvas.drawImage(
ImageReader(img),
30,
letter[1] - 65,
150,
35,
)
canvas.drawString(letter[0] - 60, 20, "Page " + str(doc.page))
canvas.restoreState()
@disable_buttons
async def save_report(file_location):
p1_logo, p1_title = create_first_page()
data = {}
for line in window["ip_table"].Values:
data[line[0]] = {
"Model": line[1],
"Total Chips": line[2],
"Left Chips": line[3],
"Center Chips": line[5],
"Right Chips": line[7],
"Nominal": 1,
}
async for miner in MinerFactory().get_miner_generator([key for key in data.keys()]):
if miner:
data[str(miner.ip)]["Nominal"] = miner.nominal_chips
list_data = []
for ip in data.keys():
new_data = data[ip]
new_data["IP"] = ip
list_data.append(new_data)
list_data = sorted(
list_data, reverse=False, key=lambda x: ipaddress.ip_address(x["IP"])
)
image_selection_data = {}
for miner in list_data:
miner_bad_boards = ""
if miner["Left Chips"] < miner["Nominal"]:
miner_bad_boards += "l"
if miner["Center Chips"] < miner["Nominal"]:
miner_bad_boards += "c"
if miner["Right Chips"] < miner["Nominal"]:
miner_bad_boards += "r"
image_selection_data[miner["IP"]] = miner_bad_boards
doc = SimpleDocTemplate(
file_location,
pagesize=letter,
topMargin=1 * inch,
leftMargin=1 * inch,
rightMargin=1 * inch,
bottomMargin=1 * inch,
title=f"Board Report {datetime.datetime.now().strftime('%Y/%b/%d')}",
)
pie_chart, board_table = create_boards_pie_chart(image_selection_data)
table_data = get_table_data(image_selection_data)
miner_img_table = Table(
table_data,
colWidths=0.8 * inch,
# repeatRows=1,
# rowHeights=[4 * inch],
)
miner_img_table.setStyle(
TableStyle(
[
("SPAN", (0, 0), (-1, 0)),
("LEFTPADDING", (0, 0), (-1, -1), 0),
("RIGHTPADDING", (0, 0), (-1, -1), 0),
("BOTTOMPADDING", (0, 1), (-1, -1), 0),
("TOPPADDING", (0, 1), (-1, -1), 0),
("BOTTOMPADDING", (0, 0), (-1, 0), 20),
("TOPPADDING", (0, 0), (-1, 0), 20),
]
)
)
elements = []
elements.append(p1_logo)
elements.append(p1_title)
elements.append(PageBreak())
elements.append(pie_chart)
elements.append(Spacer(0, 60))
elements.append(board_table)
elements.append(PageBreak())
elements.append(miner_img_table)
elements.append(PageBreak())
elements.append(
Paragraph(
"Board Data",
style=TITLE_STYLE,
)
)
elements.append(create_data_table(list_data))
elements.append(PageBreak())
doc.build(
elements,
onFirstPage=add_first_page_number,
onLaterPages=add_page_header,
)
def create_boards_pie_chart(data):
labels = ["All Working", "1 Bad Board", "2 Bad Boards", "3 Bad Boards"]
num_bad_boards = [0, 0, 0, 0]
for item in data.keys():
num_bad_boards[len(data[item])] += 1
cmap = plt.get_cmap("Blues")
cs = cmap(np.linspace(0.2, 0.8, num=len(num_bad_boards)))
fig1, ax = plt.subplots()
ax.pie(
num_bad_boards,
labels=labels,
autopct="%1.2f%%",
shadow=True,
startangle=180,
colors=cs,
pctdistance=0.8,
)
ax.axis("equal")
ax.set_title("Broken Boards", fontsize=24, pad=20)
imgdata = BytesIO()
fig1.savefig(imgdata, format="svg")
imgdata.seek(0) # rewind the data
drawing = svg2rlg(imgdata)
imgdata.close()
plt.close("all")
pie_chart = KeepInFrame(375, 375, [Image(drawing)], hAlign="CENTER")
table_data = [labels, num_bad_boards]
t = Table(table_data)
table_style = TableStyle(
[
# ("FONTSIZE", (0, 0), (-1, -1), 13),
# line for below titles
("LINEBELOW", (0, 0), (-1, 0), 2, colors.black),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
# line for above totals
("LINEABOVE", (0, -1), (-1, -1), 2, colors.black),
# line for beside unit #
("LINEAFTER", (0, 0), (0, -1), 2, colors.black),
# gridlines and outline of table
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.black),
("BOX", (0, 0), (-1, -1), 2, colors.black),
("LEFTPADDING", (0, 0), (-1, -1), 3),
("RIGHTPADDING", (0, 0), (-1, -1), 3),
("BOTTOMPADDING", (0, 0), (-1, -1), 3),
("TOPPADDING", (0, 0), (-1, -1), 3),
]
)
t.setStyle(table_style)
# zebra stripes on table
for each in range(len(table_data)):
if each % 2 == 0:
bg_color = colors.whitesmoke
else:
bg_color = colors.lightgrey
t.setStyle(TableStyle([("BACKGROUND", (0, each), (-1, each), bg_color)]))
return pie_chart, t
def create_first_page():
title_style = ParagraphStyle(
"Title",
alignment=TA_CENTER,
fontSize=50,
spaceAfter=40,
spaceBefore=150,
fontName="Helvetica-Bold",
)
img_dec = b64decode(LOGO)
img = BytesIO(img_dec)
img.seek(0)
logo = KeepInFrame(450, 105, [Image(img)])
title = Paragraph("Board Report", style=title_style)
return logo, title
def create_data_table(data):
left_bad_boards = 0
right_bad_boards = 0
center_bad_boards = 0
table_data = []
for miner in data:
miner_bad_boards = 0
if miner["Left Chips"] < miner["Nominal"]:
miner_bad_boards += 1
left_bad_boards += 1
if miner["Center Chips"] < miner["Nominal"]:
miner_bad_boards += 1
right_bad_boards += 1
if miner["Right Chips"] < miner["Nominal"]:
miner_bad_boards += 1
center_bad_boards += 1
table_data.append(
[
miner["IP"],
miner["Total Chips"],
miner["Left Chips"],
miner["Center Chips"],
miner["Right Chips"],
miner_bad_boards,
]
)
table_data.append(
[
"Total",
sum([miner[1] for miner in table_data]),
sum([miner[2] for miner in table_data]),
sum([miner[3] for miner in table_data]),
sum([miner[4] for miner in table_data]),
sum([miner[5] for miner in table_data]),
]
)
table_data[:0] = (
[
"IP",
"Total Chips",
"Left Board Chips",
"Center Board Chips",
"Right Board Chips",
"Failed Boards",
],
)
# create the table
t = Table(table_data, repeatRows=1)
# generate a basic table style
table_style = TableStyle(
[
("FONTSIZE", (0, 0), (-1, -1), 8),
# line for below titles
("LINEBELOW", (0, 0), (-1, 0), 2, colors.black),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
# line for above totals
("LINEABOVE", (0, -1), (-1, -1), 2, colors.black),
("FONTNAME", (0, -1), (-1, -1), "Helvetica-Bold"),
# line for beside unit #
("LINEAFTER", (0, 0), (0, -1), 2, colors.black),
("FONTNAME", (0, 0), (0, -1), "Helvetica-Bold"),
# gridlines and outline of table
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.black),
("BOX", (0, 0), (-1, -1), 2, colors.black),
]
)
for (
row,
values,
) in enumerate(table_data):
if not row == 0 and not row == (len(table_data) - 1):
failed_boards = values[5]
if not failed_boards == 0:
table_style.add("TEXTCOLOR", (5, row), (5, row), colors.red)
# set the styles to the table
t.setStyle(table_style)
# zebra stripes on table
for each in range(len(table_data)):
if each % 2 == 0:
bg_color = colors.whitesmoke
else:
bg_color = colors.lightgrey
t.setStyle(TableStyle([("BACKGROUND", (0, each), (-1, each), bg_color)]))
return t
def get_table_data(data):
table_elems = [[Paragraph("Hashboard Visual Representation", style=TITLE_STYLE)]]
table_row = []
table_style = TableStyle(
[
("LEFTPADDING", (0, 0), (-1, -1), 0),
("RIGHTPADDING", (0, 0), (-1, -1), 0),
("BOTTOMPADDING", (0, 0), (-1, -1), 0),
("BOX", (0, 0), (-1, -1), 2, colors.black),
]
)
table_width = 0.8 * inch
for ip in data.keys():
img_dec = b64decode(IMAGE_SELECTION_MATRIX[data[ip]])
img = BytesIO(img_dec)
img.seek(0)
image = KeepInFrame(table_width, table_width, [Image(img)])
ip_para = Paragraph(ip, style=IP_STYLE)
table_row.append(
Table([[ip_para], [image]], colWidths=table_width, style=table_style)
)
# table_row.append(image)
# table_row_txt.append(ip_para)
if len(table_row) > 7:
# table_elems.append(table_row_txt)
# table_elems.append(table_row)
table_elems.append(table_row)
# table_row_txt = []
table_row = []
if not table_row == []:
table_elems.append(table_row)
return table_elems

View File

@@ -0,0 +1,96 @@
import ipaddress
import re
from tools.bad_board_util_old.layout import window
import pyperclip
def table_select_all():
window["ip_table"].update(
select_rows=([row for row in range(len(window["ip_table"].Values))])
)
def copy_from_table(table):
selection = table.selection()
copy_values = []
for each in selection:
try:
# value = table.item(each)["values"][0]
table_values = table.item(each)["values"]
ip = table_values[0]
model = table_values[1]
total = str(table_values[2])
l_brd_chips = str(table_values[3])
c_brd_chips = str(table_values[5])
r_brd_chips = str(table_values[7])
all_values = [ip, model, total, l_brd_chips, c_brd_chips, r_brd_chips]
value = ", ".join(all_values)
copy_values.append(str(value))
except Exception as e:
print("Copy Error:", e)
copy_string = "\n".join(copy_values)
pyperclip.copy(copy_string)
async def update_ui_with_data(key, message, append=False):
if append:
message = window[key].get_text() + message
window[key].update(message)
async def update_prog_bar(amount):
window["progress"].Update(amount)
percent_done = 100 * (amount / window["progress"].maxlen)
window["progress_percent"].Update(f"{round(percent_done, 2)} %")
if percent_done == 100:
window["progress_percent"].Update("")
async def set_progress_bar_len(amount):
window["progress"].Update(0, max=amount)
window["progress"].maxlen = amount
window["progress_percent"].Update("0.0 %")
async def sort_data(index: int or str):
if window["scan"].Disabled:
return
await update_ui_with_data("status", "Sorting Data")
data_list = window["ip_table"].Values
table = window["ip_table"].Widget
all_data = []
for idx, item in enumerate(data_list):
all_data.append({"data": item, "tags": table.item(int(idx) + 1)["tags"]})
# ip addresses
if re.match(
"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)",
str(all_data[0]["data"][index]),
):
new_list = sorted(
all_data, key=lambda x: ipaddress.ip_address(x["data"][index])
)
if all_data == new_list:
new_list = sorted(
all_data,
reverse=True,
key=lambda x: ipaddress.ip_address(x["data"][index]),
)
# everything else, model, chips
else:
new_list = sorted(all_data, key=lambda x: x["data"][index])
if all_data == new_list:
new_list = sorted(all_data, reverse=True, key=lambda x: x["data"][index])
new_data = []
for item in new_list:
new_data.append(item["data"])
await update_ui_with_data("ip_table", new_data)
for idx, item in enumerate(new_list):
table.item(idx + 1, tags=item["tags"])
await update_ui_with_data("status", "")

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,90 @@
import asyncio
import sys
import PySimpleGUI as sg
from tools.bad_board_util_old.layout import window
from tools.bad_board_util_old.func.miners import (
refresh_data,
scan_and_get_data,
miner_light,
)
from tools.bad_board_util_old.func.files import import_iplist, export_iplist
from tools.bad_board_util_old.func.pdf import save_report
from tools.bad_board_util_old.func.ui import (
sort_data,
copy_from_table,
table_select_all,
)
from network import MinerNetwork
import webbrowser
async def ui():
window.read(timeout=0)
table = window["ip_table"].Widget
table.bind("<Control-Key-c>", lambda x: copy_from_table(table))
table.bind("<Control-Key-a>", lambda x: table_select_all())
# light tag shows red row for fault lights
table.tag_configure("bad", foreground="white", background="orange")
table.tag_configure("light", foreground="white", background="red")
table.tag_configure("light+bad", foreground="white", background="red")
while True:
event, value = window.read(timeout=0)
if event in (None, "Close", sg.WIN_CLOSED):
sys.exit()
if isinstance(event, tuple):
if len(window["ip_table"].Values) > 0:
if event[0] == "ip_table":
if event[2][0] == -1:
await sort_data(event[2][1])
if event == "open_in_web":
for row in value["ip_table"]:
webbrowser.open("http://" + window["ip_table"].Values[row][0])
if event == "scan":
if len(value["miner_network"].split("/")) > 1:
network = value["miner_network"].split("/")
miner_network = MinerNetwork(ip_addr=network[0], mask=network[1])
else:
miner_network = MinerNetwork(value["miner_network"])
asyncio.create_task(scan_and_get_data(miner_network))
if event == "save_report":
if not value["save_report"] == "":
asyncio.create_task(save_report(value["save_report"]))
window["save_report"].update("")
if event == "select_all_ips":
if len(value["ip_table"]) == len(window["ip_table"].Values):
window["ip_table"].update(select_rows=())
else:
window["ip_table"].update(
select_rows=([row for row in range(len(window["ip_table"].Values))])
)
if event == "light":
if len(window["ip_table"].Values) > 0:
asyncio.create_task(
miner_light(
[
window["ip_table"].Values[item][0]
for item in value["ip_table"]
]
)
)
if event == "import_iplist":
asyncio.create_task(import_iplist(value["file_iplist"]))
if event == "export_iplist":
asyncio.create_task(
export_iplist(
value["file_iplist"],
[window["ip_table"].Values[item][0] for item in value["ip_table"]],
)
)
if event == "refresh_data":
asyncio.create_task(
refresh_data(
[window["ip_table"].Values[item][0] for item in value["ip_table"]]
)
)
if event == "__TIMEOUT__":
await asyncio.sleep(0)