Onyx logo

Previous topic

onyx.dataflow.streamprocess – A base class and some simple subclasses for dataflow processing.

Next topic

onyx.dataflow.command – A dataflow processing element for executing command line subprocesses

This Page

onyx.dataflow.join – Join processing elements, that is, elements which can be sent data from

multiple streams simultaneously.

Join elements handle more than one incoming stream simultaneously. That is, their process functions may be called by multiple upstream callers, possibly running in different threads. Several types are available, including the SerializingJoin, which just sends whatever comes in to the sendee (but which handles the locking necessary to be called from multiple threads), the RoundRobinJoin, which enforces a round-robin protocol on callers of process functions, and two flavors of slot-filling join, which produce sorted tuples of processed events.

class onyx.dataflow.join.AccumulatingSlotFillingJoin(sendee=None, sending=True)

Bases: onyx.dataflow.streamprocess.ProcessorBase

A join processor that fills the slots of a resulting event with accumulated events from callers of its process functions.

This element sends bundles of events processed to the sendee, taking multiple events from each prearranged incoming stream to make a complete bundle. Note that this element doesn’t have the usual process() function. Instead, clients should call create_process_function() which will dynamically allocate a process function to be called in the usual way. Each call to create_process_function() is passed an immutable tag, which must be different from all previous tags. When all process functions have been allocated, call the start() function, which will allow processing to begin. Calls to the allocated process functions will be used to fill in an (outer) tuple of event (inner) tuples. The outer tuple’s order is that of the sorted order of the tags. Calls made for tags which are already filled will accumulate in an inner tuple until a complete outer tuple is emitted. This means that there will always be one innter tuple of length 1. Note that this element does not use its own thread, so each event pushed through will be using the thread which called the allocated process function which completed the tuple.

>>> result = list()
>>> sfj0 = AccumulatingSlotFillingJoin(result.append)
>>> proc0 = sfj0.create_process_function(2)
>>> proc1 = sfj0.create_process_function(1)
>>> proc2 = sfj0.create_process_function(3)
>>> sfj0.sorted_tag_set
[1, 2, 3]
>>> sfj0.start()

get_process_function() returns the process function associated with a particular tag, or None if no function has that tag.

>>> sfj0.get_process_function(1) is proc1  
True
>>> sfj0.get_process_function('a') is None
True

Low-level singlethreaded (deterministic) processing. This way of driving things not suggested for real use; you should use real source of some kind to feed this join; see below.

>>> for i in xrange(5):
...     proc0(i)
...     proc1(i+10)
...     proc1(i+20)
...     proc2(i+100)
>>> result
[((10, 20), (0,), (100,)), ((11, 21), (1,), (101,)), ((12, 22), (2,), (102,)), ((13, 23), (3,), (103,)), ((14, 24), (4,), (104,))]
>>> del result[:]

Here’s a multithreaded test. Note that we’re reusing the processor constructed above, and that all slots are currently empty. These IteratorSources each use their own thread and will be calling their sendees asynchronously.

>>> is0 = IteratorSource(xrange(0,5), sendee=proc0)
>>> is1 = IteratorSource(xrange(10,15), sendee=proc1)
>>> is2 = IteratorSource(xrange(100,105), sendee=proc2)

We start by turning on is1, but this only fills one slot, so there’s no possibility of output

>>> is1.start()
>>> is1.wait_iter_stopped()
True
>>> result
[]

Now turn on is0. There’s still no possible output, since we can’t fill an entire tuple.

>>> is0.start()
>>> is0.wait_iter_stopped()
True
>>> result
[]

Finally turn is2 on, and wait until we get our first (and only) output

>>> is2.start()
>>> while not result:
...   sleep(1/64) # polling
>>> result
[((10, 11, 12, 13, 14), (0, 1, 2, 3, 4), (100,))]

Note that this is all we’re going to get, since is0 and is1 are now empty and no triple can be filled. XXX So we might want some kind of ‘flush’ operation.

>>> is0.is_iter_stopped and is1.is_iter_stopped
True
>>> is0.stop(flush=True)
>>> is1.stop(flush=True)
>>> is0.wait()
>>> is1.wait()

Note: if and until there’s more API support for deterministic assessment of Source status, we can’t make calls that show that result will not grow. We just know it won’t given the current setup, so the following test will always pass.

>>> result
[((10, 11, 12, 13, 14), (0, 1, 2, 3, 4), (100,))]
create_process_function(new_tag)
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

get_process_function(tag)
graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(value)

Subclasses must override the process(value) function.

The function accepts a single value. If, after processing the single value, it has a new result or results, the function must call send(result) for each result that is calculated. For a given call to process(), the function can call send() zero, one, or multiple times depending on the semantics that the processor type implements.

send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

sorted_tag_set
start()
static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.join.BlockingSlotFillingJoin(sendee=None, sending=True)

Bases: onyx.dataflow.streamprocess.ProcessorBase

A join processor that fills the slots of a resulting event with single events from callers of its process functions.

This element sends bundles of events processed to the sendee, taking one event from each prearranged incoming stream to make a complete bundle. Note that this element doesn’t have the usual process() function. Instead, clients should call create_process_function() which will dynamically allocate a process function to be called in the usual way. Each call to create_process_function() is passed an immutable tag, which must be different from all previous tags. When all process functions have been allocated, call the start() function, which will allow processing to begin. Calls to the allocated process functions will be used to fill in a tuple of events whose order is that same as the sorted order of the tags. Calls made for tags which are already filled will block until a complete tuple is emitted and an empty slot is again available. Note that this element does not use its own thread, so each event pushed through will be using the thread which called the allocated process function which completed the tuple.

>>> result = list()
>>> sfj0 = BlockingSlotFillingJoin(sendee=result.append)
>>> proc0 = sfj0.create_process_function(2)
>>> proc1 = sfj0.create_process_function(1)
>>> proc2 = sfj0.create_process_function(3)
>>> sfj0.sorted_tag_set
[1, 2, 3]
>>> sfj0.start()

get_process_function returns the process function associated with a particular tag, or None if no function has that tag.

>>> sfj0.get_process_function(1) is proc1
True
>>> sfj0.get_process_function('a') is None
True

Here’s a test with a single thread. This means we must make the calls to the process functions in complete groups since if one slot is filled twice before the others are filled once, our process will hang! This way of setting things up is not suggested for real use; you probably want to use a real source of some kind to feed this join.

>>> for i in xrange(5):
...     proc0(i)
...     proc1(i+10)
...     proc2(i+100)
>>> result
[(10, 0, 100), (11, 1, 101), (12, 2, 102), (13, 3, 103), (14, 4, 104)]
>>> del result[:]

Here’s a multithreaded test. Note that we’re reusing the processor constructed above, and that there are no slots currently filled. These IteratorSources each use their own thread and will be calling their sendees asynchronously.

>>> is0 = IteratorSource(xrange(0,5), sendee=proc0)
>>> is1 = IteratorSource(xrange(10,15), sendee=proc1)
>>> is2 = IteratorSource(xrange(100,110), sendee=proc2)

We start by turning on is1, but this only fills one slot, so there’s no output

>>> is1.start()
>>> result
[]

Now turn on is0. There’s still no output, since we haven’t filled an entire tuple.

>>> is0.start()
>>> result
[]

Finally turn is2 on, whereon things run to completion.

>>> is2.start()

We wait (blocking) until both of the iterators that will get exhausted and stop have stopped; then stop the IteratorSources, but leave each queue to be emptied by the IteratorSource; then wait for the queues to be emptied.

>>> is0.wait_iter_stopped() and is1.wait_iter_stopped()
True
>>> is0.stop(flush=True)
>>> is1.stop(flush=True)
>>> is0.wait()
>>> is1.wait()
>>> result
[(10, 0, 100), (11, 1, 101), (12, 2, 102), (13, 3, 103), (14, 4, 104)]
>>> is2.is_iter_stopped
False

Note that even though is2 has another 5 events, there are no corresponding is0 and is1 events, so no tuples can be made. XXX So we might want some sort of flush operation.

create_process_function(new_tag)
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

get_process_function(tag)
graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(value)

Subclasses must override the process(value) function.

The function accepts a single value. If, after processing the single value, it has a new result or results, the function must call send(result) for each result that is calculated. For a given call to process(), the function can call send() zero, one, or multiple times depending on the semantics that the processor type implements.

send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

sorted_tag_set
start()
static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.join.GatingJoin(sendee=None, sending=True)

Bases: onyx.dataflow.streamprocess.ProcessorBase

A join processor which gates one input on the other. Process functions are obtained by calls to get_gating_process_function() and get_gated_process_function(). Events sent to the gated process function are queued and that function returns immediately. Events sent to the gating process function are sent on to the sendee, and when that send returns, the current queue of gated events is also sent.

>>> collector = list()
>>> gj0 = GatingJoin(sendee=collector.append)
>>> gater = gj0.get_gating_process_function()
>>> gated = gj0.get_gated_process_function()
>>> for i in xrange(10): gated(i)
>>> gater('a')
>>> while len(collector) < 11:
...    sleep(1/1024)
>>> collector
['a', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

It’s an error to send another event to the gating input without calling reset() first.

>>> gater('b')
Traceback (most recent call last):
...
ValueError: gating event already seen; use reset() before sending another gating event
>>> gj0.reset()
>>> for i in xrange(100, 110): gated(i)
>>> gater('c')
>>> while len(collector) < 22:
...    sleep(1/1024)
>>> collector
['a', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'c', 100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

get_gated_process_function()

Call this function to get the process function that is controlled by the gate. Events sent to this function will be queued until the gate is opened by a call to the gating process function. See get_gating_process_function()

get_gating_process_function()

Call this function to get the process function that controls the gate. Send an event to this function to open the gate and allow events from the gated function to be passed along. Note that the gating process function can’t be called again until there’s been a a call to reset(). See get_gated_process_function()

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(value)

Subclasses must override the process(value) function.

The function accepts a single value. If, after processing the single value, it has a new result or results, the function must call send(result) for each result that is calculated. For a given call to process(), the function can call send() zero, one, or multiple times depending on the semantics that the processor type implements.

reset()

Call this function to reset the gate to closed.

send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.join.RoundRobinJoin(sendee=None, sending=True)

Bases: onyx.dataflow.streamprocess.ProcessorBase

A join processor that enforces a round-robin protocol on callers of its process functions.

This element sends all events processed to the sendee, taking one event at a time from each prearranged incoming stream. Note that this element doesn’t have the usual process() function. Instead, clients should call create_process_function() which will dynamically allocate a process function to be called in the usual way. Each client who calls create_process_function() is implicitly given a position in the round. When all positions have been allocated, call the start() function, which will allow processing to begin. The first allocated function goes first, and so on. Calls to the allocated process functions will block until it’s that caller’s turn, at which point their event will be sent to the sendee. Note that this element does not use its own thread, so each event pushed through will be using the thread which called the allocated process function.

>>> result = list()
>>> rrj0 = RoundRobinJoin(sendee=result.append)

Here’s a test with a single thread. This means we’d better make the calls to the process functions in the right order or we’ll hang our process! This way of setting things up is not suggested for real use; you probably want to use a real source of some kind to feed this join.

>>> proc0 = rrj0.create_process_function()
>>> proc1 = rrj0.create_process_function()
>>> proc2 = rrj0.create_process_function()
>>> rrj0.num_registered
3
>>> rrj0.start()
>>> for i in xrange(5):
...     proc0(i)
...     proc1(i+10)
...     proc2(i+100)
>>> result
[0, 10, 100, 1, 11, 101, 2, 12, 102, 3, 13, 103, 4, 14, 104]
>>> del result[:]

Here’s a multithreaded test. Note that we’re reusing the processor constructed above, and that it’s currently proc0’s turn. These IteratorSources each use their own thread and will be calling their sendees asynchronously. The RoundRobinJoin enforces turn-taking between the threads.

>>> is0 = IteratorSource(xrange(0,5), sendee=proc0)
>>> is1 = IteratorSource(xrange(10,15), sendee=proc1)
>>> is2 = IteratorSource(xrange(100,105), sendee=proc2)

We start by turning on is1, but it’s not his turn, so there’s no output

>>> is1.start()
>>> result
[]

Now turn on is0. It’s his turn, so he gets one, then is1 gets one turn, then things lock up again since is2 still isn’t on. So, we wait until result gets those two items.

>>> is0.start()
>>> while len(result) < 2:
...   sleep(1/64) # polling
>>> result
[0, 10]
>>> sleep(1/32) # result isn't going to grow
>>> result
[0, 10]

Finally turn is2 on, whereon things run to completion.

>>> is2.start()

We wait (blocking) until all of the iterators that will get exhausted and stop have stopped; then stop the IteratorSources, but leave each queue to be emptied by the IteratorSource; then wait for the queues to be emptied. At that point, we can be sure the result will have everything in it.

>>> is0.wait_iter_stopped() and is1.wait_iter_stopped() and is2.wait_iter_stopped()
True
>>> is0.stop(flush=True)
>>> is1.stop(flush=True)
>>> is2.stop(flush=True)
>>> is0.wait()
>>> is1.wait()
>>> is2.wait()
>>> result
[0, 10, 100, 1, 11, 101, 2, 12, 102, 3, 13, 103, 4, 14, 104]
create_process_function()
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

num_registered
process(value)

Subclasses must override the process(value) function.

The function accepts a single value. If, after processing the single value, it has a new result or results, the function must call send(result) for each result that is calculated. For a given call to process(), the function can call send() zero, one, or multiple times depending on the semantics that the processor type implements.

release_process_function(fn)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

start()
static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.join.SerializingJoin(sendee=None, sending=True)

Bases: onyx.dataflow.streamprocess.ProcessorBase

A very simple join processor which will take things sent into it by multiple callers of the process function and send them along to the sendee.

Suitable for use when more than one thread might each be calling the process function, and all you need is to get the events into a single stream. Note that this element does not use its own thread, so each event pushed through will be using the thread which called process() to send the data on. Neither is there any buffering done here, so threads calling process() will block until any currently on-going process()/send() calls complete.

>>> result = list()
>>> join0 = SerializingJoin(sendee=result.append)
>>> for i in xrange(10):
...    join0.process(i)
>>> result
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> del result[:]
>>> range_size = 32
>>> small_range = xrange(100, 100+range_size)
>>> large_range = xrange(1000, 1000+range_size)
>>> small = set(small_range)
>>> large = set(large_range)
>>> is0 = IteratorSource(small_range, sendee=join0.process)
>>> is1 = IteratorSource(large_range, sendee=join0.process)

Use real-time simulation to get nearly deterministic interleaving of data

>>> real_time_base = 1 / 16
>>> is0.simulate_realtime(real_time_base)
>>> is1.simulate_realtime(real_time_base)
>>> def go():
...   # use a function to try to avoid per-statement threading delays from doctest
...   is0.start()
...   sleep(real_time_base / 2) # delay by one half the base period
...   is1.start()
>>> go()
>>> is0.wait_iter_stopped() and is1.wait_iter_stopped()
True
>>> is0.stop(flush=True)
>>> is1.stop(flush=True)
>>> is0.wait()
>>> is1.wait()
>>> len(result)
64
>>> set(result) == small | large
True

Verify that exactly half the items in the first half of result are from the small range. Note: the result being verified is not deterministic; the test will fail from time to time....

>>> small_in_first_half = set(result[:len(result)//2]) & small
>>> len(small_in_first_half) == range_size // 2
True
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(item)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.join.SynchronizingSequenceJoin(sendee=None, sending=True)

Bases: onyx.dataflow.streamprocess.ProcessorBase

A synchronizing join processor. Process functions are obtained by calls to create_process_function(). The order of calls to create_process_function establishes the order of the slots in the event sequences that get sent on. Events arriving at each slot are queued up. An event is built and sent on when every slot has at least one item in its queue (and the last slot that got an item has only that single item in its queue). The sent event is a tuple containing one item popped off each queue. Thus, events are synchronized and are emitted at the rate (and latency) of the slowest source. Both create_process_function() and the returned functions are thread-safe and single-thread safe.

>>> ssj = SynchronizingSequenceJoin()
>>> funcs = list(ssj.create_process_function() for i in xrange(5))

Don’t need no sendee yet since we don’t fill the first slot

>>> for i, f in enumerate(funcs[1:]): f(i)
>>> for i, f in enumerate(funcs[1:]): f(10+i)

Now create a sendee, and it gets the synchronized result

>>> result = list()
>>> ssj.set_sendee(result.append)
>>> funcs[0](-1)
>>> result
[(-1, 0, 1, 2, 3)]

No new result

>>> for i, f in enumerate(funcs[1:]): f(20+i)
>>> result
[(-1, 0, 1, 2, 3)]

Now get out the next two synchronized results

>>> funcs[0](-10)
>>> result
[(-1, 0, 1, 2, 3), (-10, 10, 11, 12, 13)]
>>> funcs[0](-20)
>>> result
[(-1, 0, 1, 2, 3), (-10, 10, 11, 12, 13), (-20, 20, 21, 22, 23)]

Now the other slots are empty, so no new result

>>> funcs[0](-30)
>>> result
[(-1, 0, 1, 2, 3), (-10, 10, 11, 12, 13), (-20, 20, 21, 22, 23)]

Add a new inputter (this is thread safe too)

>>> funcs.append(ssj.create_process_function())
>>> for f in funcs: f(None)
>>> result
[(-1, 0, 1, 2, 3), (-10, 10, 11, 12, 13), (-20, 20, 21, 22, 23), (-30, None, None, None, None, None)]
create_process_function()
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(value)

Subclasses must override the process(value) function.

The function accepts a single value. If, after processing the single value, it has a new result or results, the function must call send(result) for each result that is calculated. For a given call to process(), the function can call send() zero, one, or multiple times depending on the semantics that the processor type implements.

send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().