lump.multithreading module

exception lump.multithreading.AlreadyRunningError[source]

Bases: lump.multithreading.MultiThreadError

class lump.multithreading.MultiThread(processor=None, n_workers=5, queue_buffer_size=None, pbar=None, pass_thread_id=True)[source]

Bases: object

Simple wrapper class for basic multithreading

>>> from collections import namedtuple
>>> def noop(*args, **kwargs):
...    pass
>>> def refl(*args, **kwargs):
...    return args, kwargs
>>> t = MultiThread(noop, n_workers=10)
>>> t.extend(range(0, 1000))
>>> t.run()
[]
>>> total = 0
>>> n_workers = 3
>>> workers = [0] * n_workers
>>> def proc(n, thread_id, *args, **kwargs):
...    global total, workers
...    from time import sleep
...    sleep(.01)
...    workers[thread_id] += 1
...    total += n
...    return n
...
>>> t = MultiThread(proc, n_workers=n_workers)
>>> t.extend(range(0, 100))
>>> results = t.run()
>>> set(results) == set(range(0, 100))
True
>>> total
4950
>>> sum(workers)
100
>>> 0 in workers
False
>>> t = MultiThread(refl, n_workers=2)
>>> t.extend(range(0, 10))
>>> res = t.run('test', 'arg2', kw=True)
>>> len(res)
10
>>> all(len(a) == 3 for a, k in res)
True
>>> all(a[0] == 'test' for a, k in res)
True
>>> all(a[1] == 'arg2' for a, k in res)
True
>>> all(a[2] in range(0, 10) for a, k in res)
True
>>> all(k['kw'] is True for a, k in res)
True
>>> all(k['thread_id'] in [0, 1] for a, k in res)
True
>>> from time import sleep
>>> t = MultiThread(lambda k, **kwargs: '%s%.s' % (k, sleep(.2)))
>>> t.append('before')
>>> t.result
>>> t.start()
>>> t.result
[]
>>> sleep(.3)
>>> t.result
['before']
>>> t.append('after')
>>> t.result
['before']
>>> t.wait() #doctest: +ELLIPSIS
<lump.multithreading.MultiThread object at 0x...>
>>> t.result
['before', 'after']
append(*args)[source]
extend(iterable)[source]
run(*args, **kwargs) → list[source]
run_with_iter(iterable, *args, **kwargs)[source]
running
start(*args, **kwargs)[source]
wait()[source]
exception lump.multithreading.MultiThreadError[source]

Bases: BaseException

lump.multithreading.multithreaded(*args, class_method_with_self=False, pre_start=False, **kwargs)[source]

Decorator version for multithreading

>>> from time import sleep
>>> from collections import namedtuple
>>> @multithreaded(2)
... def refl(*args, **kwargs):
...    return args, kwargs
>>> @multithreaded(10)
... def proc1(*args, **kwargs):
...   pass
>>> proc1(range(0, 100))
[]
>>> total = 0
>>> n_workers = 3
>>> workers = [0] * n_workers
>>>
>>> @multithreaded(n_workers)
... def proc(n, thread_id, *args, **kwargs):
...    global total, workers
...    from time import sleep
...    sleep(.01)
...    workers[thread_id] += 1
...    total += n
...    return n
...
>>> type(proc._multithread) is MultiThread
True
>>> results = proc(range(0, 100))
>>> set(results) == set(range(0, 100))
True
>>> total
4950
>>> sum(workers)
100
>>> 0 in workers
False
>>> res = refl(range(0, 10), 'test', 'arg2', kw=True)
>>> len(res)
10
>>> type(res)
<class 'list'>
>>> res[0][0][0]
'test'
>>> all(len(a) == 3 for a, k in res)
True
>>> all(a[0] == 'test' for a, k in res)
True
>>> all(a[1] == 'arg2' for a, k in res)
True
>>> all(a[2] in range(0, 10) for a, k in res)
True
>>> all(k['kw'] is True for a, k in res)
True
>>> all(k['thread_id'] in [0, 1] for a, k in res)
True

See also

singlethreaded()

lump.multithreading.multithreadedmethod(*args, class_method_with_self=True, pre_start=False, **kwargs)

Decorator version for multithreading

>>> from time import sleep
>>> from collections import namedtuple
>>> @multithreaded(2)
... def refl(*args, **kwargs):
...    return args, kwargs
>>> @multithreaded(10)
... def proc1(*args, **kwargs):
...   pass
>>> proc1(range(0, 100))
[]
>>> total = 0
>>> n_workers = 3
>>> workers = [0] * n_workers
>>>
>>> @multithreaded(n_workers)
... def proc(n, thread_id, *args, **kwargs):
...    global total, workers
...    from time import sleep
...    sleep(.01)
...    workers[thread_id] += 1
...    total += n
...    return n
...
>>> type(proc._multithread) is MultiThread
True
>>> results = proc(range(0, 100))
>>> set(results) == set(range(0, 100))
True
>>> total
4950
>>> sum(workers)
100
>>> 0 in workers
False
>>> res = refl(range(0, 10), 'test', 'arg2', kw=True)
>>> len(res)
10
>>> type(res)
<class 'list'>
>>> res[0][0][0]
'test'
>>> all(len(a) == 3 for a, k in res)
True
>>> all(a[0] == 'test' for a, k in res)
True
>>> all(a[1] == 'arg2' for a, k in res)
True
>>> all(a[2] in range(0, 10) for a, k in res)
True
>>> all(k['kw'] is True for a, k in res)
True
>>> all(k['thread_id'] in [0, 1] for a, k in res)
True

See also

singlethreaded()

lump.multithreading.singlethreaded(*args, pass_thread_id=True, class_method_with_self=False, pre_start=False, pbar=None, **kwargs)[source]

To easily disable the multithreading, and just run sequentially (just replace @multithreaded with @singlethreaded)

>>> from collections import namedtuple
>>> @singlethreaded(2)
... def refl(*args, **kwargs):
...    return args, kwargs
>>> @singlethreaded(10)
... def proc1(*args, **kwargs):
...   pass
>>> proc1(range(0, 100))
[]
>>> total = 0
>>> workers = [0]
>>>
>>> @singlethreaded()
... def proc(n, thread_id, *args, **kwargs):
...    global total, workers
...    from time import sleep
...    debug = 'n=%d, thread_id=%d, args=%s %s' % (n, thread_id, args, kwargs)
...    # print(debug)
...    # raise Exception(debug)
...    sleep(.01)
...    if thread_id >= len(workers):
...        raise IndexError("thread_id %d not found: %s" % (thread_id, debug))
...    workers[thread_id] += 1
...    total += n
...    return n
...
>>> results = proc(range(0, 100))
>>> set(results) == set(range(0, 100))
True
>>> total
4950
>>> workers
[100]
>>> sum(workers)
100
>>> 0 in workers
False
>>> res = refl(range(0, 10), 'test', 'arg2', kw=True)
>>> len(res)
10
>>> type(res)
<class 'list'>
>>> res[0][0][0]
'test'
>>> all(len(a) == 3 for a, k in res)
True
>>> all(a[0] == 'test' for a, k in res)
True
>>> all(a[1] == 'arg2' for a, k in res)
True
>>> all(a[2] in range(0, 10) for a, k in res)
True
>>> all(k['kw'] is True for a, k in res)
True
>>> all(k['thread_id'] in [0, 1] for a, k in res)
True

See also

multithreaded()

lump.multithreading.singlethreadedmethod(*args, pass_thread_id=True, class_method_with_self=True, pre_start=False, pbar=None, **kwargs)

To easily disable the multithreading, and just run sequentially (just replace @multithreaded with @singlethreaded)

>>> from collections import namedtuple
>>> @singlethreaded(2)
... def refl(*args, **kwargs):
...    return args, kwargs
>>> @singlethreaded(10)
... def proc1(*args, **kwargs):
...   pass
>>> proc1(range(0, 100))
[]
>>> total = 0
>>> workers = [0]
>>>
>>> @singlethreaded()
... def proc(n, thread_id, *args, **kwargs):
...    global total, workers
...    from time import sleep
...    debug = 'n=%d, thread_id=%d, args=%s %s' % (n, thread_id, args, kwargs)
...    # print(debug)
...    # raise Exception(debug)
...    sleep(.01)
...    if thread_id >= len(workers):
...        raise IndexError("thread_id %d not found: %s" % (thread_id, debug))
...    workers[thread_id] += 1
...    total += n
...    return n
...
>>> results = proc(range(0, 100))
>>> set(results) == set(range(0, 100))
True
>>> total
4950
>>> workers
[100]
>>> sum(workers)
100
>>> 0 in workers
False
>>> res = refl(range(0, 10), 'test', 'arg2', kw=True)
>>> len(res)
10
>>> type(res)
<class 'list'>
>>> res[0][0][0]
'test'
>>> all(len(a) == 3 for a, k in res)
True
>>> all(a[0] == 'test' for a, k in res)
True
>>> all(a[1] == 'arg2' for a, k in res)
True
>>> all(a[2] in range(0, 10) for a, k in res)
True
>>> all(k['kw'] is True for a, k in res)
True
>>> all(k['thread_id'] in [0, 1] for a, k in res)
True

See also

multithreaded()