Source code for lump.multithreading

from queue import Queue
from threading import Thread
from functools import partial
from itertools import chain
from .null import Null

import logging
logger = logging.getLogger(__name__)


[docs]class MultiThreadError(BaseException): pass
[docs]class AlreadyRunningError(MultiThreadError): pass
[docs]class MultiThread: """ Simple wrapper class for basic multithreading >>> from collections import namedtuple >>> def noop(*args, **kwargs): ... pass >>> def refl(*args, **kwargs): ... return args, kwargs >>> t = MultiThread(noop, n_workers=10) >>> t.extend(range(0, 1000)) >>> t.run() [] >>> total = 0 >>> n_workers = 3 >>> workers = [0] * n_workers >>> def proc(n, thread_id, *args, **kwargs): ... global total, workers ... from time import sleep ... sleep(.01) ... workers[thread_id] += 1 ... total += n ... return n ... >>> t = MultiThread(proc, n_workers=n_workers) >>> t.extend(range(0, 100)) >>> results = t.run() >>> set(results) == set(range(0, 100)) True >>> total 4950 >>> sum(workers) 100 >>> 0 in workers False >>> t = MultiThread(refl, n_workers=2) >>> t.extend(range(0, 10)) >>> res = t.run('test', 'arg2', kw=True) >>> len(res) 10 >>> all(len(a) == 3 for a, k in res) True >>> all(a[0] == 'test' for a, k in res) True >>> all(a[1] == 'arg2' for a, k in res) True >>> all(a[2] in range(0, 10) for a, k in res) True >>> all(k['kw'] is True for a, k in res) True >>> all(k['thread_id'] in [0, 1] for a, k in res) True >>> from time import sleep >>> t = MultiThread(lambda k, **kwargs: '%s%.s' % (k, sleep(.2))) >>> t.append('before') >>> t.result >>> t.start() >>> t.result [] >>> sleep(.3) >>> t.result ['before'] >>> t.append('after') >>> t.result ['before'] >>> t.wait() #doctest: +ELLIPSIS <lump.multithreading.MultiThread object at 0x...> >>> t.result ['before', 'after'] """ def __init__(self, processor=None, n_workers=5, queue_buffer_size=None, pbar=None, pass_thread_id=True): self.processor = processor if queue_buffer_size is None: queue_buffer_size = 0 self.q = Queue(maxsize=queue_buffer_size) self.n_workers = n_workers self.pbar = pbar self.logger = logger self.result = None self.pass_thread_id = pass_thread_id def _worker(self, *args, **kwargs): self.logger.info('Worker started') processor = partial(self.processor, *args, **kwargs) while True: args = self.q.get() result = None try: result = processor(*args) except Exception as e: if self.logger: self.logger.exception(e) if result is not None: self.result.append(result) self.q.task_done() if self.pbar is not None: self.pbar.update(1)
[docs] def append(self, *args): # self.logger.debug('Append 1 item to queue') self.q.put(args)
[docs] def extend(self, iterable): for row in iterable: self.append(row)
[docs] def wait(self): self.q.join() return self
[docs] def start(self, *args, **kwargs): if self.running: raise AlreadyRunningError("Attempted to run while already running") self.result = [] self.logger.debug('Starting threaded run, %d workers', self.n_workers) for i in range(self.n_workers): if self.pass_thread_id: kwargs['thread_id'] = i t = Thread(target=partial(self._worker, *args, **kwargs), daemon=True) t.start()
@property def running(self): return self.result is not None
[docs] def run_with_iter(self, iterable, *args, **kwargs): self.logger.debug('run') self.start(*args, **kwargs) self.logger.debug('started, add iterable') self.extend(iterable) self.logger.debug('waiting to finish') self.wait() result = self.result self.result = None self.logger.debug('Threaded run done, %d results', len(result)) return result
[docs] def run(self, *args, **kwargs) -> list: self.logger.debug('run') self.start(*args, **kwargs) self.logger.debug('started, waiting now') self.wait() result = self.result self.result = None self.logger.debug('Threaded run done, %d results', len(result)) return result
[docs]def singlethreaded(*args, pass_thread_id=True, class_method_with_self=False, pre_start=False, pbar=None, **kwargs): """ To easily disable the multithreading, and just run sequentially (just replace ``@multithreaded`` with ``@singlethreaded``) >>> from collections import namedtuple >>> @singlethreaded(2) ... def refl(*args, **kwargs): ... return args, kwargs >>> @singlethreaded(10) ... def proc1(*args, **kwargs): ... pass >>> proc1(range(0, 100)) [] >>> total = 0 >>> workers = [0] >>> >>> @singlethreaded() ... def proc(n, thread_id, *args, **kwargs): ... global total, workers ... from time import sleep ... debug = 'n=%d, thread_id=%d, args=%s %s' % (n, thread_id, args, kwargs) ... # print(debug) ... # raise Exception(debug) ... sleep(.01) ... if thread_id >= len(workers): ... raise IndexError("thread_id %d not found: %s" % (thread_id, debug)) ... workers[thread_id] += 1 ... total += n ... return n ... >>> results = proc(range(0, 100)) >>> set(results) == set(range(0, 100)) True >>> total 4950 >>> workers [100] >>> sum(workers) 100 >>> 0 in workers False >>> res = refl(range(0, 10), 'test', 'arg2', kw=True) >>> len(res) 10 >>> type(res) <class 'list'> >>> res[0][0][0] 'test' >>> all(len(a) == 3 for a, k in res) True >>> all(a[0] == 'test' for a, k in res) True >>> all(a[1] == 'arg2' for a, k in res) True >>> all(a[2] in range(0, 10) for a, k in res) True >>> all(k['kw'] is True for a, k in res) True >>> all(k['thread_id'] in [0, 1] for a, k in res) True .. seealso:: :func:`multithreaded` """ del args, kwargs def _decorator(func): def _(alist, *args, **kwargs): args = list(args) if class_method_with_self: args[0], alist = alist, args[0] results = [] for arow in alist: try: if class_method_with_self: nargs = chain([args[0]], [arow], args[1:]) else: nargs = chain(args, [arow]) nargs = list(nargs) if pass_thread_id: kwargs['thread_id'] = 0 res = func(*nargs, **kwargs) if res is not None: results.append(res) except Exception as e: logger.exception(e) finally: if pbar: pbar.update(1) return results _._multithread = Null return _ return _decorator
singlethreadedmethod = partial(singlethreaded, class_method_with_self=True)
[docs]def multithreaded(*args, class_method_with_self=False, pre_start=False, **kwargs): """ Decorator version for multithreading >>> from time import sleep >>> from collections import namedtuple >>> @multithreaded(2) ... def refl(*args, **kwargs): ... return args, kwargs >>> @multithreaded(10) ... def proc1(*args, **kwargs): ... pass >>> proc1(range(0, 100)) [] >>> total = 0 >>> n_workers = 3 >>> workers = [0] * n_workers >>> >>> @multithreaded(n_workers) ... def proc(n, thread_id, *args, **kwargs): ... global total, workers ... from time import sleep ... sleep(.01) ... workers[thread_id] += 1 ... total += n ... return n ... >>> type(proc._multithread) is MultiThread True >>> results = proc(range(0, 100)) >>> set(results) == set(range(0, 100)) True >>> total 4950 >>> sum(workers) 100 >>> 0 in workers False >>> res = refl(range(0, 10), 'test', 'arg2', kw=True) >>> len(res) 10 >>> type(res) <class 'list'> >>> res[0][0][0] 'test' >>> all(len(a) == 3 for a, k in res) True >>> all(a[0] == 'test' for a, k in res) True >>> all(a[1] == 'arg2' for a, k in res) True >>> all(a[2] in range(0, 10) for a, k in res) True >>> all(k['kw'] is True for a, k in res) True >>> all(k['thread_id'] in [0, 1] for a, k in res) True .. seealso:: :func:`singlethreaded` """ # Add 'None' processor a = [None] a.extend(args) args = a mt = MultiThread(*args, **kwargs) del args, kwargs def _decorator(func): mt.processor = func def _(alist, *args, **kwargs): if class_method_with_self: args = list(args) args[0], alist = alist, args[0] if not pre_start: mt.extend(alist) return mt.run(*args, **kwargs) return mt.run_with_iter(alist, *args, **kwargs) _._multithread = mt return _ return _decorator
multithreadedmethod = partial(multithreaded, class_method_with_self=True)