jacinle.concurrency package

Submodules

jacinle.concurrency.counter module

class jacinle.concurrency.counter.Counter[source]

Bases: object

get()[source]
tick()[source]
class jacinle.concurrency.counter.CounterBasedEvent(target, tqdm=None)[source]

Bases: object

Thread-safe counter-based callback invoker. When the counter is incremented, the system will check whether the counter has reached a target value. If so, the event will be set.

clear()[source]
is_set()[source]
tick()[source]
wait(timeout=None)[source]
class jacinle.concurrency.counter.CounterBasedMonitor(counters=None, display_names=None, interval=1, printf=None)[source]

Bases: object

start()[source]
tick(name=None)[source]

jacinle.concurrency.event module

class jacinle.concurrency.event.MPLibExtension[source]

Bases: object

jacinle.concurrency.event.instantiate_mplib_ext(base_class)[source]
jacinle.concurrency.event.MTBooleanEvent

alias of jacinle.concurrency.event.instantiate_mplib_ext.<locals>.MultiThreadingImpl

jacinle.concurrency.event.MPBooleanEvent

alias of jacinle.concurrency.event.instantiate_mplib_ext.<locals>.MultiProcessingImpl

jacinle.concurrency.event.MTOrEvent(*events, mplib=<module 'threading' from '/home/docs/.pyenv/versions/3.7.9/lib/python3.7/threading.py'>)

Waiting on several events together. http://stackoverflow.com/questions/12317940/python-threading-can-i-sleep-on-two-threading-events-simultaneously

jacinle.concurrency.event.MPOrEvent(*events, mplib=<module 'multiprocessing' from '/home/docs/.pyenv/versions/3.7.9/lib/python3.7/multiprocessing/__init__.py'>)

Waiting on several events together. http://stackoverflow.com/questions/12317940/python-threading-can-i-sleep-on-two-threading-events-simultaneously

class jacinle.concurrency.event.MTCoordinatorEvent(nr_workers)[source]

Bases: object

broadcast()[source]
check()[source]
wait()[source]

jacinle.concurrency.future module

class jacinle.concurrency.future.FutureResult[source]

Bases: object

A thread-safe future implementation. Used only as one-to-one pipe.

get()[source]
put(result)[source]

jacinle.concurrency.packing module

jacinle.concurrency.packing.check_pickle()[source]
jacinle.concurrency.packing.loadb_pickle()

Read and return an object from the given pickle data.

The protocol version of the pickle is detected automatically, so no protocol argument is needed. Bytes past the pickled object’s representation are ignored.

Optional keyword arguments are fix_imports, encoding and errors, which are used to control compatibility support for pickle stream generated by Python 2. If fix_imports is True, pickle will try to map the old Python 2 names to the new names used in Python 3. The encoding and errors tell pickle how to decode 8-bit string instances pickled by Python 2; these default to ‘ASCII’ and ‘strict’, respectively. The encoding can be ‘bytes’ to read these 8-bit string instances as bytes objects.

jacinle.concurrency.packing.dumpb_pickle()

Return the pickled representation of the object as a bytes object.

The optional protocol argument tells the pickler to use the given protocol; supported protocols are 0, 1, 2, 3 and 4. The default protocol is 3; a backward-incompatible protocol designed for Python 3.

Specifying a negative protocol version selects the highest protocol version supported. The higher the protocol used, the more recent the version of Python needed to read the pickle produced.

If fix_imports is True and protocol is less than 3, pickle will try to map the new Python 3 names to the old module names used in Python 2, so that the pickle data stream is readable with Python 2.

jacinle.concurrency.packing.check_msgpack()[source]
jacinle.concurrency.packing.check_pyarrow()[source]
jacinle.concurrency.packing.loadb(bstr, *args, backend=None, **kwargs)[source]
jacinle.concurrency.packing.dumpb(obj, *args, backend=None, **kwargs)[source]
jacinle.concurrency.packing.get_available_backends()[source]
jacinle.concurrency.packing.get_default_backend()[source]
jacinle.concurrency.packing.set_default_backend(backend)[source]

jacinle.concurrency.pool module

class jacinle.concurrency.pool.Pool(nr_workers=None)[source]

Bases: object

Process(*args, **kwargs)[source]
Queue(*args, **kwargs)[source]
map(func, iterable, chunksize=1, sort=True, callback=None)[source]
start()[source]
terminate()[source]
try_start()[source]
class jacinle.concurrency.pool.TQDMPool(nr_workers=None)[source]

Bases: jacinle.concurrency.pool.Pool

map(func, iterable, chunksize=1, sort=True, total=None, desc='', callback=None, use_tqdm=True, update_interval=0.1, update_iters=1, **kwargs)[source]

jacinle.concurrency.process module

class jacinle.concurrency.process.JacProcess(*args, extra_env=None, seed=None, **kwargs)[source]

Bases: multiprocessing.context.Process

run()[source]

Method to be run in sub-process; can be overridden in sub-class

jacinle.concurrency.queue module

class jacinle.concurrency.queue.ListToFill(nr_target)[source]

Bases: list

append(*args, **kwargs)[source]

Append object to the end of the list.

jacinle.concurrency.queue.iter_queue(q, total=None)[source]
jacinle.concurrency.queue.sorted_iter(iter, id_func=None)[source]

jacinle.concurrency.shmarray module

Shared memory array implementation for numpy which delegates all the nasty stuff to multiprocessing.sharedctypes.

Copyright (c) 2010, David Baddeley All rights reserved.

jacinle.concurrency.shmarray.create(shape, dtype='d', alignment=32)[source]

Create an uninitialised shared array. Avoid object arrays, as these will almost certainly break as the objects themselves won’t be stored in shared memory, only the pointers

jacinle.concurrency.shmarray.create_copy(a)[source]

create a a shared copy of an array

jacinle.concurrency.shmarray.ones(shape, dtype='d')[source]

Create an shared array initialised to ones. Avoid object arrays, as these will almost certainly break as the objects themselves won’t be stored in shared memory, only the pointers

class jacinle.concurrency.shmarray.shmarray[source]

Bases: numpy.ndarray

subclass of ndarray with overridden pickling functions which record dtype, shape etc… but defer pickling of the underlying data to the original data source.

Doesn’t actually handle allocation of the shared memory - this is done in create, and zeros, ones, (or create_copy) are the functions which should be used for creating a new shared memory array.

TODO - add argument checking to ensure that the user is passing reasonable values.

jacinle.concurrency.shmarray.zeros(shape, dtype='d')[source]

Create an shared array initialised to zeros. Avoid object arrays, as these will almost certainly break as the objects themselves won’t be stored in shared memory, only the pointers

jacinle.concurrency.zmq_utils module

jacinle.concurrency.zmq_utils.bind_to_random_ipc(sock, name)[source]
jacinle.concurrency.zmq_utils.graceful_close(sock)[source]
jacinle.concurrency.zmq_utils.iter_recv(meth, sock)[source]
jacinle.concurrency.zmq_utils.json_dumpb(x)
jacinle.concurrency.zmq_utils.json_loadb(x)
jacinle.concurrency.zmq_utils.pull_pyobj(sock, flag=<Flag.DONTWAIT: 1>)[source]
jacinle.concurrency.zmq_utils.push_pyobj(sock, data, flag=<Flag.DONTWAIT: 1>)[source]
jacinle.concurrency.zmq_utils.req_recv_json(sock, flag=0, loader=<function <lambda>>)[source]
jacinle.concurrency.zmq_utils.req_send_and_recv(sock, *payloads)[source]
jacinle.concurrency.zmq_utils.req_send_json(sock, *payloads, flag=0, dumper=<function <lambda>>)[source]
jacinle.concurrency.zmq_utils.router_recv_json(sock, flag=<Flag.DONTWAIT: 1>, loader=<function <lambda>>)[source]
jacinle.concurrency.zmq_utils.router_send_json(sock, identifier, *payloads, flag=0, dumper=<function <lambda>>)[source]
jacinle.concurrency.zmq_utils.uid()[source]