lump.multithreading module¶
-
class
lump.multithreading.
MultiThread
(processor=None, n_workers=5, queue_buffer_size=None, pbar=None, pass_thread_id=True)[source]¶ Bases:
object
Simple wrapper class for basic multithreading
>>> from collections import namedtuple >>> def noop(*args, **kwargs): ... pass >>> def refl(*args, **kwargs): ... return args, kwargs >>> t = MultiThread(noop, n_workers=10) >>> t.extend(range(0, 1000)) >>> t.run() [] >>> total = 0 >>> n_workers = 3 >>> workers = [0] * n_workers >>> def proc(n, thread_id, *args, **kwargs): ... global total, workers ... from time import sleep ... sleep(.01) ... workers[thread_id] += 1 ... total += n ... return n ... >>> t = MultiThread(proc, n_workers=n_workers) >>> t.extend(range(0, 100)) >>> results = t.run() >>> set(results) == set(range(0, 100)) True >>> total 4950 >>> sum(workers) 100 >>> 0 in workers False >>> t = MultiThread(refl, n_workers=2) >>> t.extend(range(0, 10)) >>> res = t.run('test', 'arg2', kw=True) >>> len(res) 10 >>> all(len(a) == 3 for a, k in res) True >>> all(a[0] == 'test' for a, k in res) True >>> all(a[1] == 'arg2' for a, k in res) True >>> all(a[2] in range(0, 10) for a, k in res) True >>> all(k['kw'] is True for a, k in res) True >>> all(k['thread_id'] in [0, 1] for a, k in res) True >>> from time import sleep >>> t = MultiThread(lambda k, **kwargs: '%s%.s' % (k, sleep(.2))) >>> t.append('before') >>> t.result >>> t.start() >>> t.result [] >>> sleep(.3) >>> t.result ['before'] >>> t.append('after') >>> t.result ['before'] >>> t.wait() #doctest: +ELLIPSIS <lump.multithreading.MultiThread object at 0x...> >>> t.result ['before', 'after']
-
running
¶
-
-
exception
lump.multithreading.
MultiThreadError
[source]¶ Bases:
BaseException
-
lump.multithreading.
multithreaded
(*args, class_method_with_self=False, pre_start=False, **kwargs)[source]¶ Decorator version for multithreading
>>> from time import sleep >>> from collections import namedtuple >>> @multithreaded(2) ... def refl(*args, **kwargs): ... return args, kwargs >>> @multithreaded(10) ... def proc1(*args, **kwargs): ... pass >>> proc1(range(0, 100)) [] >>> total = 0 >>> n_workers = 3 >>> workers = [0] * n_workers >>> >>> @multithreaded(n_workers) ... def proc(n, thread_id, *args, **kwargs): ... global total, workers ... from time import sleep ... sleep(.01) ... workers[thread_id] += 1 ... total += n ... return n ... >>> type(proc._multithread) is MultiThread True >>> results = proc(range(0, 100)) >>> set(results) == set(range(0, 100)) True >>> total 4950 >>> sum(workers) 100 >>> 0 in workers False >>> res = refl(range(0, 10), 'test', 'arg2', kw=True) >>> len(res) 10 >>> type(res) <class 'list'> >>> res[0][0][0] 'test' >>> all(len(a) == 3 for a, k in res) True >>> all(a[0] == 'test' for a, k in res) True >>> all(a[1] == 'arg2' for a, k in res) True >>> all(a[2] in range(0, 10) for a, k in res) True >>> all(k['kw'] is True for a, k in res) True >>> all(k['thread_id'] in [0, 1] for a, k in res) True
See also
-
lump.multithreading.
multithreadedmethod
(*args, class_method_with_self=True, pre_start=False, **kwargs)¶ Decorator version for multithreading
>>> from time import sleep >>> from collections import namedtuple >>> @multithreaded(2) ... def refl(*args, **kwargs): ... return args, kwargs >>> @multithreaded(10) ... def proc1(*args, **kwargs): ... pass >>> proc1(range(0, 100)) [] >>> total = 0 >>> n_workers = 3 >>> workers = [0] * n_workers >>> >>> @multithreaded(n_workers) ... def proc(n, thread_id, *args, **kwargs): ... global total, workers ... from time import sleep ... sleep(.01) ... workers[thread_id] += 1 ... total += n ... return n ... >>> type(proc._multithread) is MultiThread True >>> results = proc(range(0, 100)) >>> set(results) == set(range(0, 100)) True >>> total 4950 >>> sum(workers) 100 >>> 0 in workers False >>> res = refl(range(0, 10), 'test', 'arg2', kw=True) >>> len(res) 10 >>> type(res) <class 'list'> >>> res[0][0][0] 'test' >>> all(len(a) == 3 for a, k in res) True >>> all(a[0] == 'test' for a, k in res) True >>> all(a[1] == 'arg2' for a, k in res) True >>> all(a[2] in range(0, 10) for a, k in res) True >>> all(k['kw'] is True for a, k in res) True >>> all(k['thread_id'] in [0, 1] for a, k in res) True
See also
-
lump.multithreading.
singlethreaded
(*args, pass_thread_id=True, class_method_with_self=False, pre_start=False, pbar=None, **kwargs)[source]¶ To easily disable the multithreading, and just run sequentially (just replace
@multithreaded
with@singlethreaded
)>>> from collections import namedtuple >>> @singlethreaded(2) ... def refl(*args, **kwargs): ... return args, kwargs >>> @singlethreaded(10) ... def proc1(*args, **kwargs): ... pass >>> proc1(range(0, 100)) [] >>> total = 0 >>> workers = [0] >>> >>> @singlethreaded() ... def proc(n, thread_id, *args, **kwargs): ... global total, workers ... from time import sleep ... debug = 'n=%d, thread_id=%d, args=%s %s' % (n, thread_id, args, kwargs) ... # print(debug) ... # raise Exception(debug) ... sleep(.01) ... if thread_id >= len(workers): ... raise IndexError("thread_id %d not found: %s" % (thread_id, debug)) ... workers[thread_id] += 1 ... total += n ... return n ... >>> results = proc(range(0, 100)) >>> set(results) == set(range(0, 100)) True >>> total 4950 >>> workers [100] >>> sum(workers) 100 >>> 0 in workers False >>> res = refl(range(0, 10), 'test', 'arg2', kw=True) >>> len(res) 10 >>> type(res) <class 'list'> >>> res[0][0][0] 'test' >>> all(len(a) == 3 for a, k in res) True >>> all(a[0] == 'test' for a, k in res) True >>> all(a[1] == 'arg2' for a, k in res) True >>> all(a[2] in range(0, 10) for a, k in res) True >>> all(k['kw'] is True for a, k in res) True >>> all(k['thread_id'] in [0, 1] for a, k in res) True
See also
-
lump.multithreading.
singlethreadedmethod
(*args, pass_thread_id=True, class_method_with_self=True, pre_start=False, pbar=None, **kwargs)¶ To easily disable the multithreading, and just run sequentially (just replace
@multithreaded
with@singlethreaded
)>>> from collections import namedtuple >>> @singlethreaded(2) ... def refl(*args, **kwargs): ... return args, kwargs >>> @singlethreaded(10) ... def proc1(*args, **kwargs): ... pass >>> proc1(range(0, 100)) [] >>> total = 0 >>> workers = [0] >>> >>> @singlethreaded() ... def proc(n, thread_id, *args, **kwargs): ... global total, workers ... from time import sleep ... debug = 'n=%d, thread_id=%d, args=%s %s' % (n, thread_id, args, kwargs) ... # print(debug) ... # raise Exception(debug) ... sleep(.01) ... if thread_id >= len(workers): ... raise IndexError("thread_id %d not found: %s" % (thread_id, debug)) ... workers[thread_id] += 1 ... total += n ... return n ... >>> results = proc(range(0, 100)) >>> set(results) == set(range(0, 100)) True >>> total 4950 >>> workers [100] >>> sum(workers) 100 >>> 0 in workers False >>> res = refl(range(0, 10), 'test', 'arg2', kw=True) >>> len(res) 10 >>> type(res) <class 'list'> >>> res[0][0][0] 'test' >>> all(len(a) == 3 for a, k in res) True >>> all(a[0] == 'test' for a, k in res) True >>> all(a[1] == 'arg2' for a, k in res) True >>> all(a[2] in range(0, 10) for a, k in res) True >>> all(k['kw'] is True for a, k in res) True >>> all(k['thread_id'] in [0, 1] for a, k in res) True
See also