import os
from queue import Queue
from threading import Barrier, Event, Thread
from typing import Callable, Generator, Iterable
from tqdm.auto import tqdm
class ActorExitException(Exception):
...
[docs]class Parallel(object):
"""
Actor model extension
https://en.wikipedia.org/wiki/Actor_model
Usage:
.. code:: python
# global define in those case
import time
from random import randint
def func(param):
time.sleep(randint(1, 2))
print('param:', param)
return param
.. code:: python
poi = Parallel(func)
poi.put(1)
poi.put(3)
poi.run_as_async()
poi.puts([5, 7, 9])
# output rand ->
# get param 1
# get param 5
# ...
# get param 7
# if await with `keep` flag
poi.await_at(keep=True) # -> will block because the `keep` flag set will block until self.stop()
# its exit with poi.stop() in another thread
# else if only use await
poi.await_at() # -> will block until tasks were consumed finish
.. code:: python
# if want to block put, set `queue.maxsize`
poi.queue.maxsize = poi.size
.. code:: python
poi = Parallel(func)
poi.puts([1, 2, 3])
poi.run_as_async()
print('start')
poi.await_at()
print('end')
# output ->
# start
# param: 2
# param: 1
# param: 3
# end
.. code:: python
poi = Parallel(func)
poi.puts([1, 2, 3])
poi.run_as_await()
print('start')
poi.put(None)
# output ->
# param: 2
# param: 1
# param: 3
# start
# raise ActorExitException: This actor is terminated
.. code:: python
poi = Parallel(func)
poi.run_as_async()
print('start')
poi.puts([1, 2, 3])
# output raise ->
# ActorExitException: This actor is terminated
.. code:: python
poi = Parallel(func, keep=True)
poi.run_as_async()
print('start')
poi.puts([1, 2, 3])
poi.await_at()
print('end')
# output ->
# start
# param: 2
# param: 1
# param: 3 <- blocked in here until poi.stop()
.. code:: python
poi = Parallel(func, keep=True)
poi.run_as_async()
print('start')
poi.puts([1, 2, 3])
time.sleep(3)
print('sleep timeout')
poi.await_at(keep=False)
print('end')
# output ->
# start
# param: 2
# param: 1
# param: 3
# sleep timeout
# end
.. code:: python
poi = Parallel(func, preload=[1, 2, 3])
print('start')
poi.run_as_async()
time.sleep(3)
print('sleep timeout')
poi.await_at(keep=True)
print('end')
# output ->
# start
# param: 2
# param: 1
# param: 3
# sleep timeout
# end
.. code:: python
poi = Parallel(func, preload=[1, 2, 3])
print('start')
poi.run_as_async()
poi.await_at(keep=True)
print('end')
# output ->
# start
# param: 2
# param: 1
# param: 3 <- blocked in here until poi.stop()
.. code:: python
poi = Parallel(func, preload=[1, 2, 3])
...
# equal Parallel(func).puts([1, 2, 3])
.. code:: python
poi = Parallel(func, preload=[1, 2, 3])
poi.run_as_async()
results = list(poi.results)
print('results', results)
for item in poi.results:
print('item', item)
print('tasks end')
# output ->
# results [2, 1, 3]
# tasks end
.. code:: python
poi = Parallel(func, preload=[1, 2, 3])
poi.run_as_async()
for item in poi.results:
print('item', item)
print('tasks end')
# output ->
# item 2
# item 1
# item 3
# tasks end
.. code:: python
poi = Parallel(func, preload=range(20), progress=True)
poi.run_as_async()
# output ->
# 100%|███████████████████████| 20/20 [00:01<00:00, 20it/s]
"""
def __init__(self, func: Callable[[any], any], preload: Iterable = None,
keep=False, progress=False, size=os.cpu_count() * 2 + 1):
"""
parallel run func with multi-threading
:param func: target method to run, need receive an arg
:param preload: preload any args
:param keep: whether or not keep thread while queue is empty
if keep=False, thread will auto close while task end
:param size: how much thread to parallel run
"""
self.method = func
self.keep = keep
self.size = size
self.queue = Queue()
self.tasks = []
self.progress = progress
self.tqdm = None
self._is_start = Event()
self._is_stop = Event()
self.callbacks = [self._tasks_done_notify]
self._tasks_done = Barrier(size, action=self._tasks_done_callback)
self._results = Queue()
self._results_gen = None
for item in preload or []:
self.queue.put(item)
self.total = self.queue.qsize()
self.count = 0
def put(self, item):
if self.is_stop:
raise ActorExitException('This actor is terminated')
self.queue.put(item)
if item is not ActorExitException:
self.total += 1
if self.tqdm:
self.tqdm.total = self.total
self.tqdm.refresh()
def puts(self, items):
for item in items:
self.put(item)
def _consumer(self):
while True:
if not self.keep and self.queue.empty():
raise ActorExitException
item = self.queue.get()
self.queue.task_done()
if item is ActorExitException:
raise ActorExitException
result = self.method(item)
if result is not None:
self._results.put(result)
self.count += 1
self.tqdm and self.tqdm.update()
def _bootstrap(self):
try:
self._consumer()
except ActorExitException:
pass
finally:
if not self.keep and self.queue.empty():
self.put(ActorExitException)
self._tasks_done.wait()
def _tasks_done_callback(self):
self._is_stop.set()
for callback in self.callbacks:
if not isinstance(callback, Callable):
continue
callback()
def _tasks_done_notify(self):
self._results.put(ActorExitException)
self.tqdm and self.tqdm.close()
def _get_result(self):
while True:
result = self._results.get()
self._results.task_done()
if result is ActorExitException:
return
yield result
@property
def results(self) -> Generator:
if not self._results_gen:
self._results_gen = self._get_result()
return self._results_gen
@property
def is_stop(self):
return self._is_stop.is_set()
@property
def is_start(self):
return self._is_start.is_set()
def run_as_async(self):
if self.is_stop:
raise ActorExitException('This actor is terminated')
if self.is_start:
return self
self._is_start.set()
self.tqdm = tqdm(total=self.total, leave=True, dynamic_ncols=True)
for index in range(self.size):
task = Thread(name='parallel', target=self._bootstrap)
self.tasks.append(task)
task.start()
return self
def run_as_await(self):
self.run_as_async()
return self.await_at()
def await_at(self, keep: bool = None):
if isinstance(keep, bool):
if self.keep and not keep:
self.puts([ActorExitException] * self.size)
self.keep = keep
for task in self.tasks:
task.join()
self._is_stop.set()
while not self.queue.empty():
self.queue.get_nowait()
self.queue.task_done()
return self.results
def stop(self):
return self.await_at(keep=False)
def __call__(self, arg_iter: Iterable = None):
self.keep = True
self.run_as_async()
self.puts(arg_iter or [])
return self.stop()