bug: fix some tasks not being cancelled properly in miner factory.

This commit is contained in:
UpstreamData
2023-12-13 10:18:28 -07:00
parent 497ffb5bc0
commit 4201905fdd

View File

@@ -389,24 +389,18 @@ MINER_CLASSES = {
async def concurrent_get_first_result(tasks: list, verification_func: Callable): async def concurrent_get_first_result(tasks: list, verification_func: Callable):
while True: res = None
await asyncio.sleep(0) for fut in asyncio.as_completed(tasks):
if len(tasks) == 0: res = await fut
return if verification_func(res):
for task in tasks: break
if task.done(): for t in tasks:
try: t.cancel()
result = await task try:
except asyncio.CancelledError: await t
for t in tasks: except asyncio.CancelledError:
t.cancel() pass
raise return res
else:
if not verification_func(result):
continue
for t in tasks:
t.cancel()
return result
class MinerFactory: class MinerFactory:
@@ -453,7 +447,7 @@ class MinerFactory:
task, timeout=settings.get("factory_get_timeout", 3) task, timeout=settings.get("factory_get_timeout", 3)
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
task.cancel() continue
else: else:
if miner_type is not None: if miner_type is not None:
break break
@@ -481,7 +475,7 @@ class MinerFactory:
task, timeout=settings.get("factory_get_timeout", 3) task, timeout=settings.get("factory_get_timeout", 3)
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
task.cancel() pass
boser_enabled = None boser_enabled = None
if miner_type == MinerTypes.BRAIINS_OS: if miner_type == MinerTypes.BRAIINS_OS:
@@ -507,19 +501,30 @@ class MinerFactory:
return await concurrent_get_first_result(tasks, lambda x: x is not None) return await concurrent_get_first_result(tasks, lambda x: x is not None)
async def _get_miner_web(self, ip: str): async def _get_miner_web(self, ip: str):
urls = [f"http://{ip}/", f"https://{ip}/"] tasks = []
async with httpx.AsyncClient( try:
transport=settings.transport(verify=False) urls = [f"http://{ip}/", f"https://{ip}/"]
) as session: async with httpx.AsyncClient(
tasks = [asyncio.create_task(self._web_ping(session, url)) for url in urls] transport=settings.transport(verify=False)
) as session:
tasks = [
asyncio.create_task(self._web_ping(session, url)) for url in urls
]
text, resp = await concurrent_get_first_result( text, resp = await concurrent_get_first_result(
tasks, tasks,
lambda x: x[0] is not None lambda x: x[0] is not None
and self._parse_web_type(x[0], x[1]) is not None, and self._parse_web_type(x[0], x[1]) is not None,
) )
if text is not None: if text is not None:
return self._parse_web_type(text, resp) return self._parse_web_type(text, resp)
except asyncio.CancelledError:
for t in tasks:
t.cancel()
try:
await t
except asyncio.CancelledError:
pass
@staticmethod @staticmethod
async def _web_ping( async def _web_ping(
@@ -565,15 +570,27 @@ class MinerFactory:
return MinerTypes.INNOSILICON return MinerTypes.INNOSILICON
async def _get_miner_socket(self, ip: str): async def _get_miner_socket(self, ip: str):
commands = ["version", "devdetails"] tasks = []
tasks = [asyncio.create_task(self._socket_ping(ip, cmd)) for cmd in commands] try:
commands = ["version", "devdetails"]
tasks = [
asyncio.create_task(self._socket_ping(ip, cmd)) for cmd in commands
]
data = await concurrent_get_first_result( data = await concurrent_get_first_result(
tasks, lambda x: x is not None and self._parse_socket_type(x) is not None tasks,
) lambda x: x is not None and self._parse_socket_type(x) is not None,
if data is not None: )
d = self._parse_socket_type(data) if data is not None:
return d d = self._parse_socket_type(data)
return d
except asyncio.CancelledError:
for t in tasks:
t.cancel()
try:
await t
except asyncio.CancelledError:
pass
@staticmethod @staticmethod
async def _socket_ping(ip: str, cmd: str) -> Optional[str]: async def _socket_ping(ip: str, cmd: str) -> Optional[str]: