add MinerData().as_influx() to write miner data as influx db line format data.

This commit is contained in:
UpstreamData
2022-08-24 13:34:57 -06:00
parent 54206da449
commit 74ebffb4fc

View File

@@ -14,7 +14,8 @@
from typing import Union, List from typing import Union, List
from dataclasses import dataclass, field, asdict from dataclasses import dataclass, field, asdict
from datetime import datetime from datetime import datetime, timezone
import time
from .error_codes import X19Error, WhatsminerError, BraiinsOSError from .error_codes import X19Error, WhatsminerError, BraiinsOSError
@@ -105,7 +106,7 @@ class MinerData:
efficiency: int = field(init=False) efficiency: int = field(init=False)
def __post_init__(self): def __post_init__(self):
self.datetime = datetime.now() self.datetime = datetime.now(timezone.utc).astimezone()
def __getitem__(self, item): def __getitem__(self, item):
try: try:
@@ -116,6 +117,9 @@ class MinerData:
def __setitem__(self, key, value): def __setitem__(self, key, value):
return setattr(self, key, value) return setattr(self, key, value)
def __iter__(self):
return iter([item for item in self.__dict__])
@property @property
def total_chips(self): # noqa - Skip PyCharm inspection def total_chips(self): # noqa - Skip PyCharm inspection
return self.right_chips + self.center_chips + self.left_chips return self.right_chips + self.center_chips + self.left_chips
@@ -172,3 +176,31 @@ class MinerData:
def asdict(self): def asdict(self):
return asdict(self) return asdict(self)
def as_influxdb(self, measurement_name: str = "miner_data"):
tag_data = [measurement_name]
field_data = []
tags = ["ip", "mac", "model", "hostname"]
for attribute in self:
if attribute in tags:
tag_data.append(f"{attribute}={self[attribute]}")
continue
if isinstance(self[attribute], str):
field_data.append(f'{attribute}="{self[attribute]}"')
continue
if isinstance(self[attribute], bool):
field_data.append(f"{attribute}={str(self[attribute]).lower()}")
continue
if isinstance(self[attribute], int):
field_data.append(f"{attribute}={self[attribute]}")
continue
if attribute == "fault_light" and not self[attribute]:
field_data.append(f"{attribute}=false")
continue
tags_str = ",".join(tag_data)
field_str = ",".join(field_data)
timestamp = str(int(time.mktime(self.datetime.timetuple()) * 1e9))
return " ".join([tags_str, field_str, timestamp])