Onyx logo

Previous topic

onyx.dataflow.source – Source elements that push data into a dataflow network

Next topic

onyx.dataflow.join – Join processing elements, that is, elements which can be sent data from

This Page

onyx.dataflow.streamprocess – A base class and some simple subclasses for dataflow processing.

class onyx.dataflow.streamprocess.ArrayReshaper(input_shape, output_shape, sendee=None, sending=True, bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

A processor for reshaping a Numpy ndarray.

input_shape should be a shape object of a Numpy ndarray, and output_shape should be a shape object of a Numpy ndarray. Incoming events must be numpy.ndarrays of input_shape as the shape. The output event will be a numpy.ndarray of output_shape shape.

>>> result = list()
>>> reshaper = ArrayReshaper((2, 5, 4), (2, 20), sendee=result.append)
>>> reshaper.process(np.arange(2*5*4).reshape((2, 5, 4)))
>>> result
[array([[ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
        17, 18, 19],
       [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36,
        37, 38, 39]])]
>>> reshaper.process(np.arange(2*4*3).reshape((2, 4, 3)))
Traceback (most recent call last):
...
ValueError: expected input data shape to be (2, 5, 4), got (2, 4, 3)
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.ArraySelector(selector, axis=-1, sendee=None, sending=True, bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

A processor for selecting some elements from a Numpy array.

selector should be a Numpy array with dtype Boolean and a shape of (len,); i.e., it must be one-dimensional. Incoming events must have the len as the dimension along the specified axis. The output event will always have nearly the same shape as the input, except that the length of the specified axis will be the number of True elements in the selector.

axis is the index of the axis along which incoming events will be sliced.

>>> array34 = np.arange(12).reshape(3, 4)
>>> array34
array([[ 0,  1,  2,  3],
       [ 4,  5,  6,  7],
       [ 8,  9, 10, 11]])
>>> sel = np.array([True, False, True])
>>> result = list()
>>> as0 = ArraySelector(sel, axis=0, sendee=result.append)
>>> as0.process(array34)
>>> result[0]
array([[ 0,  1,  2,  3],
       [ 8,  9, 10, 11]])
>>> sel = np.array([True, False, True, False])
>>> result = list()
>>> as1 = ArraySelector(sel, axis=1, sendee=result.append)
>>> as1.process(array34)
>>> result[0]
array([[ 0,  2],
       [ 4,  6],
       [ 8, 10]])
>>> as1.process(array34.reshape(4, 3))
Traceback (most recent call last):
...
ValueError: expected axis dimension to be 3, got 4
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.CollectProcessor(sendee=None, sending=True, process_types=(), bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase, list

A collecting processor, a subclass of list. Collects the values as well as passing them on. Useful for collecting elements in the middle of a chain.

This example doesn’t really do the collect processor justice because it puts the collector at the end of the chain. See usage in main() for the more useful case of collecting values from somewhere in the middle of a chain.

>>> recipient = list()
>>> e = CollectProcessor(sendee=recipient.append, process_types=int)
>>> for x in xrange(4): e.process(x)
>>> recipient
[0, 1, 2, 3]
>>> e
[0, 1, 2, 3]
append

L.append(object) – append object to end

count

L.count(value) -> integer – return number of occurrences of value

dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

extend

L.extend(iterable) – extend list by appending elements from the iterable

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

index

L.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.

insert

L.insert(index, object) – insert object before index

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

pop

L.pop([index]) -> item – remove and return item at index (default last). Raises IndexError if list is empty or index is out of range.

process(event)
remove

L.remove(value) – remove first occurrence of value. Raises ValueError if the value is not present.

reverse

L.reverse() – reverse IN PLACE

send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

sort

L.sort(cmp=None, key=None, reverse=False) – stable sort IN PLACE; cmp(x, y) -> -1, 0, 1

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.DecimatingProcessor(nth, sendee=None, sending=True, process_types=(), bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

A decimating processor. Pushes value of every nth call to process(), starting with the value of the first call to process().

>>> recipient = list()
>>> d = DecimatingProcessor(2, sendee=recipient.append, process_types=int)
>>> for x in xrange(10): d.process(x)
>>> recipient
[0, 2, 4, 6, 8]
>>> d = DecimatingProcessor(3, process_types=int)
>>> d.set_sendee(recipient.append)
>>> for x in xrange(10): d.process(x)
>>> recipient
[0, 2, 4, 6, 8, 0, 3, 6, 9]
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.DictSelectableProcessor(processor, selector_key, pass_dict_to_sendee=True, sendee=None, sending=True, bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

Wraps an existing processor and processes two element tuples where the second element is input to the wrapped processor and the first element is a dictionary used to determine whether the processor is run or if the wrapped process acts like an epsilon processor that passes the input directly to the sendee (bypassing the wrapped processor). The output sent to the sendee can either be a tuple containing the selector dictionary or the direct output from the wrapped processor based on the initialization parameters. This class allows a processor in a processing chain to be easiliy skipped at runtime without dynamically building processor chains or adding a “doNothing” parameter to the init parameters of processors in a way that would decrease their reusability.

The wrapped processor will be executed only if the slector map (first item of the input tuple) contains a True value under the key specified by the init parameter selector_key or if selector_key is None. If pass_dict_to_sendee is True, then the output sent to the sendee will be a 2-item tuple where the first item is the selector dictionary, otherwise the selector dictionary will not be passed down the processing chain. Note that inputs matching bypass_types will bypass this wrapper and the wrapped processor. This Processor does not support wrapping processors that send multiple events for an input event when pass_dict_to_sendee is set to True because the output cannot be sent down the processing chain as the second element in a tuple.

Set up a test for all use cases:

>>> def append_world(value):
...     return value + ' world'
>>> add_world_proc = FunctionProcessor(append_world, process_types=(str,))
>>> selector_dict = {'add_key':True, 'no_add_key':False}
>>> output_sink = []

Test non-skipped processor, returning a tuple with the selector dict:

>>> DictSelectableProcessor(processor=add_world_proc, selector_key='add_key', pass_dict_to_sendee=True, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
({'no_add_key': False, 'add_key': True}, 'hello world')

Test non-skipped processor by a wildcard (None) key value, returning a tuple with the selector dict:

>>> DictSelectableProcessor(processor=add_world_proc, selector_key=None, pass_dict_to_sendee=True, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
({'no_add_key': False, 'add_key': True}, 'hello world')

Test non-skipped processor, not returning a tuple with the selector dict:

>>> DictSelectableProcessor(processor=add_world_proc, selector_key='add_key', pass_dict_to_sendee=False, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
hello world

Test non-skipped processor by a wildcard (None) key value, not returning a tuple with the selector dict:

>>> DictSelectableProcessor(processor=add_world_proc, selector_key=None, pass_dict_to_sendee=False, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
hello world

Test skipped processor by False value in the selector dict, returning a tuple with the selector map:

>>> DictSelectableProcessor(processor=add_world_proc, selector_key='no_add_key', pass_dict_to_sendee=True, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
({'no_add_key': False, 'add_key': True}, 'hello')

Test skipped processor by missing key value in the selector dict, returning a tuple with the selector map:

>>> DictSelectableProcessor(processor=add_world_proc, selector_key='missing_key', pass_dict_to_sendee=True, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
({'no_add_key': False, 'add_key': True}, 'hello')

Test skipped processor by False value in the selector dict, not returning a tuple with the selector map:

>>> DictSelectableProcessor(processor=add_world_proc, selector_key='no_add_key', pass_dict_to_sendee=False, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
hello

Test skipped processor by missing key value in the selector dict, not returning a tuple with the selector map:

>>> DictSelectableProcessor(processor=add_world_proc, selector_key='missing_key', pass_dict_to_sendee=False, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
hello
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.DropByTypeFilter(sendee=None, sending=True, filter_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

A processor for filtering events out of a dataflow by type.

filter_types is a type or tuple of types or such tuples, exactly as used in Python’s builtin isinstance function. Any event e for which isinstance(e, filter_types) would return True is filtered out, that is, the event is dropped from the dataflow. All other events are passed.

See also PassByTypeFilter

>>> collect = list()
>>> fp0 = DropByTypeFilter(sendee=collect.append, filter_types=(list, dict))
>>> fp0.process(1)
>>> fp0.process('foo')
>>> fp0.process(3.415)
>>> fp0.process([])
>>> fp0.process({})
>>> fp0.process(None)
>>> fp0.process(())
>>> collect
[1, 'foo', 3.415, None, ()]
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.EpsilonProcessor(sendee=None, sending=True)

Bases: onyx.dataflow.streamprocess.NoProcessProcessor

An epsilon processor. It passes through all values sent in.

An example where a user-defined function is the sendee; and in this case that function just prints out the value.

>>> e = EpsilonProcessor()
>>> e.set_sendee(_printer)
>>> e.process('abc')
'abc'
>>> with debugprint.DebugPrint("df_node_EpsilonProcessor_debug_send"):
...     e.process('def')
df_node_EpsilonProcessor_debug_send: Event of type <type 'str'>
'def'
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.FunctionProcessor(function, sendee=None, label=None, sending=True, process_types=(), bypass_types=(), call_with_dc=False, pre_hook=None, post_hook=None)

Bases: onyx.dataflow.streamprocess.ProcessorBase

Apply a callable to the argument to process(), send the result.

function must be a callable. If call_with_dc is False, it should be take one argument, which is the input event for the processor. If call_with_dc is True, then function should be a callable taking two arguments, the second of which will be a debugging context like that returned by dcheck() in the debugprint module. If pre_hook is not None, it should be a callable which will be called with the in_coming event before the function is called. If post_hook is not None, it should be a callable which will be called with the out_going event before it is sent. Any return values from pre_hook or post_hook is ignored.

>>> floor0 = FunctionProcessor(int, sendee=_printer, process_types=(float,),
...                       bypass_types=str)
>>> for x in xrange(7): floor0.process(-0.825 * x)
0
0
-1
-2
-3
-4
-4
>>> floor0.process('hi')
'hi'
>>> floor0.process(None)
Traceback (most recent call last):
...
ValueError: processor type FunctionProcessor expected event type in one of these two groups: (<type 'float'>,) (<type 'str'>,), but got <type 'NoneType'>
>>> def pre_print(x): print('pre: %s' % (x,))
>>> def post_print(x): print('post: %s' % (x,))
>>> floor1 = FunctionProcessor(int, sendee=_printer, process_types=(float,),
...                            bypass_types=str,
...                            pre_hook=pre_print, post_hook=post_print)
>>> for x in xrange(3): floor1.process(-0.825 * x)
pre: -0.0
post: 0
0
pre: -0.825
post: 0
0
pre: -1.65
post: -1
-1
>>> def int_with_debug(value, dc):
...    dc and dc("converting value %s" % (value,))
...    return int(value)
>>> floor2 = FunctionProcessor(int_with_debug, sendee=_printer, process_types=(float,),
...                            call_with_dc=True, bypass_types=str, label='IWD')
>>> with debugprint.DebugPrint("df_node_IWD_debug"):
...     for x in xrange(3): floor2.process(-0.825 * x)
df_node_IWD_debug: converting value -0.0
0
df_node_IWD_debug: converting value -0.825
0
df_node_IWD_debug: converting value -1.65
-1
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.NoProcessProcessor(sendee=None, sending=True, bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

A specialization of ProcessorBase that only takes a bypass_types argument so it never calls its process function.

dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.NullProcessor(sendee=None, sending=True)

Bases: onyx.dataflow.streamprocess.NoProcessProcessor

A null processor. This can be used as a Processor placeholder, but it will error if anything is sent to it.

>>> e = NullProcessor()
>>> e.set_sendee(_printer)
>>> e.process('abc')
Traceback (most recent call last):
  ...
ValueError: processor type NullProcessor expected event type in one of these two groups: () (), but got <type 'str'>
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.PassByTypeFilter(sendee=None, sending=True, filter_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

A processor for filtering events out of a dataflow by type.

filter_types is a type or tuple of types or such tuples, exactly as used in Python’s builtin isinstance function. Any event e for which isinstance(e, filter_types)<isinstance() would return True is passed. All other events are filtered out, that is, dropped from the dataflow.

See also DropByTypeFilter

>>> collect = list()
>>> fp0 = PassByTypeFilter(sendee=collect.append, filter_types=(list, dict))
>>> fp0.process(1)
>>> fp0.process('foo')
>>> fp0.process(3.415)
>>> fp0.process([])
>>> fp0.process({})
>>> fp0.process(None)
>>> fp0.process(())
>>> collect
[[], {}]
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.ProcessorBase(sendee=None, sending=True, label=None, process_types=(), bypass_types=())

Bases: object

Baseclass for processor elements in a chain. Subclasses must implement the function process() to do work based on value, and to call send(result) when the work produces a result to push on down the chain. Subclasses may override or pass through process_types and bypass_types, which may be types of (nested) tuples of types; see the builtin function isinstance(). For subclasses using the standard_process_prologue() decorator, any incoming event in process_types will be processed, any incoming event in bypass_types will be sent to the next element in the dataflow without processing, and any other type will be an error. Considered as sets of types, process_types and bypass_types must be disjoint, including via subclassing.

>>> a = ProcessorBase(sendee=_printer)
>>> a.process(None)
Traceback (most recent call last):
...
NotImplementedError: ProcessorBase must implement self.process(value) as a method or attribute
>>> b = ProcessorBase(sendee=_printer, process_types=dict, bypass_types=(list, defaultdict))
Traceback (most recent call last):
...
ValueError: expected process_types and bypass_types to be disjoint, got (<type 'dict'>,) and (<type 'list'>, <type 'collections.defaultdict'>)
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(value)

Subclasses must override the process(value) function.

The function accepts a single value. If, after processing the single value, it has a new result or results, the function must call send(result) for each result that is calculated. For a given call to process(), the function can call send() zero, one, or multiple times depending on the semantics that the processor type implements.

send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.ResettableSlidingWindowProcessor(reset_type, send_reset_type, window_len, slide_len, sendee=None, sending=True, label=None, bypass_types=())

Bases: onyx.dataflow.streamprocess.SlidingWindowProcessor

A processor that behaves like SlidingWindowProcessor, except that a reset_type can be specified which resets the state to the initial windowing state. When an object of type reset_type is processed, any window that has fewer than win_len elements will not be output and processing will continue with an empty window.

>>> reset_type = type(None)
>>> bypass = float
>>> s0=ResettableSlidingWindowProcessor(reset_type, True, 5, 2, sendee=_printer, bypass_types=bypass)
>>> for i in xrange(2):
...   s0.process(np.arange(1, dtype=int))
...   s0.process(np.arange(10, 12, dtype=int))
...   s0.process(np.arange(20, 23, dtype=int))
...   s0.process(10.0)
...   s0.process(np.arange(30, 40, dtype=int))
...   s0.process(None)
array([ 0, 10, 11, 20, 21])
10.0
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])
None
array([ 0, 10, 11, 20, 21])
10.0
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])
None

Here’s an example with multidimensional input. Our initial input is 2x2x9, so output is 2x2x5.

>>> s1 = ResettableSlidingWindowProcessor(type(None), True, 5, 2, sendee=_printer)
>>> data = np.ones((2, 2, 9), dtype=float) * np.arange(9)
>>> s1.process(data)
array([[[ 0.,  1.,  2.,  3.,  4.],
        [ 0.,  1.,  2.,  3.,  4.]],
<BLANKLINE>
       [[ 0.,  1.,  2.,  3.,  4.],
        [ 0.,  1.,  2.,  3.,  4.]]])
array([[[ 2.,  3.,  4.,  5.,  6.],
        [ 2.,  3.,  4.,  5.,  6.]],
<BLANKLINE>
       [[ 2.,  3.,  4.,  5.,  6.],
        [ 2.,  3.,  4.,  5.,  6.]]])
array([[[ 4.,  5.,  6.,  7.,  8.],
        [ 4.,  5.,  6.,  7.,  8.]],
<BLANKLINE>
       [[ 4.,  5.,  6.,  7.,  8.],
        [ 4.,  5.,  6.,  7.,  8.]]])

It’s an error to change dimensions other than the last:

>>> bad_data1 = np.ones((2, 3, 9), dtype=float) * np.arange(9)
>>> s1.process(bad_data1)
Traceback (most recent call last):
...
ValueError: expected event with shape[:-1] (2, 2), but got (2, 3)

It’s an error to change dtypes:

>>> bad_data2 = np.ones((2, 2, 9), dtype=np.int32) * np.arange(9, dtype=np.int32)
>>> s1.process(bad_data2)
Traceback (most recent call last):
...
ValueError: expected event with dtype float64, but got int32

It’s an error to use a reset_type as one of the process_types. ResettableSlidingWindowProcessor has process_types == np.ndarray from its parent class by default.

>>> ResettableSlidingWindowProcessor(np.ndarray, True, 10, 20)
Traceback (most recent call last):
...
TypeError: reset_type must not be ndarray
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.SequenceFunctionProcessor(function, sendee=None, label=None, sending=True, process_types=(), bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

Apply a callable to the argument to process(); the callable returns a sequence or generator; send each item in the returned sequence.

function must be a callable. An incoming event is first checked for being an instance of a type in call_types (in the Python isinstance() sense), if so, function is called on the incoming event and the return value is sent as an output event. Second, an incoming event which is an instance of a type in bypass_types is immediately sent as an output event with no call to function. An event which doesn’t satisfy any of these conditions causes a ValueError to be raised.

>>> c = SequenceFunctionProcessor(lambda x: [x] * abs(x), sendee=_printer, process_types=(int,),
...                       bypass_types=(str,))
>>> for x in xrange(3, -4, -1): c.process(x)
3
3
3
2
2
1
-1
-2
-2
-3
-3
-3
>>> c.process('hi')
'hi'
>>> c.process(None)
Traceback (most recent call last):
...
ValueError: processor type SequenceFunctionProcessor expected event type in one of these two groups: (<type 'int'>,) (<type 'str'>,), but got <type 'NoneType'>
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.SessionProcessorBase(sendee=None, sending=True, label=None, process_types=(), bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

Baseclass for processor elements that expect to process data in sessions. All data events must occur between SessionProcessorBase.SessionBegin and SessionProcessorBase.SessionEnd events. A SessionProcessorBase.SessionBreak event specifies the point where adjacent data events are not contiguous. For example, in a Language Modeling context this could indicate that context information across data events adjoining the SessionProcessorBase.SessionBreak event will not be used. SessionProcessorBase.SessionBreak events must also occur between SessionProcessorBase.SessionBegin and SessionProcessorBase.SessionEnd.

Derived classes must implement SessionProcessorBase.process_data method. They can override the default implementation of SessionProcessorBase.session_begin, SessionProcessorBase.session_break, and SessionProcessorBase.session_end

>>> class BigramCounter(SessionProcessorBase):
...     UNKNOWN_CONTEXT = '!unk'
...     def __init__(self, sendee=None, sending=True, label=None, bypass_types=()):
...         super(BigramCounter, self).__init__(
...             sendee, sending=sending, process_types=str,
...             bypass_types=bypass_types)
...         self._prev_word = self.UNKNOWN_CONTEXT
...         self._word_counts = collections.defaultdict(int)
...     def session_break(self, event):
...         self._prev_word = self.UNKNOWN_CONTEXT
...     def session_end(self, event):
...         self._prev_word = self.UNKNOWN_CONTEXT
...         self.send(self._word_counts)
...         self._word_counts = collections.defaultdict(int)
...         super(BigramCounter, self).session_end(event)
...     def process_data(self, word):
...         self._word_counts[(self._prev_word, word)] += 1
...         self._prev_word = word
>>> res = list()
>>> bc = BigramCounter(sendee=res.append)
>>> bc.process(bc.SessionBegin(userdata=None))
>>> print res
[SessionBegin(userdata=None)]
>>> del res[:]
>>> for word in "Tortilla soup".split():
...     bc.process(word)
>>> len(res)
0
>>> bc.process(bc.SessionBreak(userdata=None))
>>> len(res)
0

Process contiguous data divided as two chunks. This should be the same as processing them as a single chunk.

>>> for word in "Eat drink".split():
...     bc.process(word)
>>> for word in "man woman".split():
...     bc.process(word)
>>> assert len(res) == 0
>>> expected_result = dict({('!unk', 'Tortilla'): 1, ('Tortilla', 'soup'): 1,
...                         ('!unk', 'Eat'): 1, ('Eat', 'drink'): 1, ('drink', 'man'): 1,
...                         ('man', 'woman'): 1})
>>> bc.process(bc.SessionEnd(userdata=None))
>>> len(res)
2
>>> res[0] == expected_result
True
>>> print res[1]
SessionEnd(userdata=None)
class SessionBegin

Bases: onyx.dataflow.streamprocess.SessionBegin

count

T.count(value) -> integer – return number of occurrences of value

index

T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.

userdata

Alias for field number 0

class SessionProcessorBase.SessionBreak

Bases: onyx.dataflow.streamprocess.SessionBreak

count

T.count(value) -> integer – return number of occurrences of value

index

T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.

userdata

Alias for field number 0

class SessionProcessorBase.SessionEnd

Bases: onyx.dataflow.streamprocess.SessionEnd

count

T.count(value) -> integer – return number of occurrences of value

index

T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.

userdata

Alias for field number 0

SessionProcessorBase.dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

SessionProcessorBase.debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

SessionProcessorBase.graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

SessionProcessorBase.label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

SessionProcessorBase.process(event)
SessionProcessorBase.process_data(value)

Process non-session events (data events). Derived classes must implement this method.

SessionProcessorBase.send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

SessionProcessorBase.sendee

The callable this processor will use to send events; see set_sendee()

SessionProcessorBase.sending

Whether this processor will currently send events at all; see set_sending()

SessionProcessorBase.session_begin(event)

Default behavior when a SessionBegin event is received.

SessionProcessorBase.session_break(event)

Default behavior when a SessionBreak event is received.

SessionProcessorBase.session_end(event)

Default behavior when a SessionEnd event is received.

SessionProcessorBase.set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

SessionProcessorBase.set_sending(sending)

Clients call this to turn sending from a processor on or off.

static SessionProcessorBase.std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.SlidingWindow(window_len, slide_len)

Bases: object

A processor for windowing Numpy arrays

window_len and slide_len should be positive integers. Input events must
be Numpy arrays. The dimensionality and dytpe of the each event seen will be checked against those from the first event seen; it’s an error if later events don’t match in both dimension and dtype, except that the final dimension may vary. Note that windowing is done in the last dimension only, so multidimensional arrays sent as out-events will maintain the shape of the input events except in this last dimension, which will always be window_len.
>>> s0 = SlidingWindow(5, 2)
>>> res = list()
>>> for i in xrange(2):
...   for x in s0.window(np.arange(1, dtype=int)): res.append(x)
...   for x in s0.window(np.arange(10,12, dtype=int)): res.append(x)
...   for x in s0.window(np.arange(20,23, dtype=int)): res.append(x)
...   for x in s0.window(np.arange(30,40, dtype=int)): res.append(x)
>>> for r in res: print repr(r)
array([ 0, 10, 11, 20, 21])
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])
array([36, 37, 38, 39,  0])
array([38, 39,  0, 10, 11])
array([ 0, 10, 11, 20, 21])
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])

Here’s an example with multidimensional input. Our initial input is 2x2x9, so output is 2x2x5.

>>> s1 = SlidingWindow(5, 2)
>>> data = np.ones((2, 2, 9), dtype=float) * np.arange(9)
>>> for x in (s1.window(data)): print repr(x)
array([[[ 0.,  1.,  2.,  3.,  4.],
        [ 0.,  1.,  2.,  3.,  4.]],
<BLANKLINE>
       [[ 0.,  1.,  2.,  3.,  4.],
        [ 0.,  1.,  2.,  3.,  4.]]])
array([[[ 2.,  3.,  4.,  5.,  6.],
        [ 2.,  3.,  4.,  5.,  6.]],
<BLANKLINE>
       [[ 2.,  3.,  4.,  5.,  6.],
        [ 2.,  3.,  4.,  5.,  6.]]])
array([[[ 4.,  5.,  6.,  7.,  8.],
        [ 4.,  5.,  6.,  7.,  8.]],
<BLANKLINE>
       [[ 4.,  5.,  6.,  7.,  8.],
        [ 4.,  5.,  6.,  7.,  8.]]])

It’s an error to change dimensions other than the last:

>>> bad_data1 = np.ones((2, 3, 9), dtype=float) * np.arange(9)
>>> for x in s1.window(bad_data1): print repr(x)
Traceback (most recent call last):
...
ValueError: expected event with shape[:-1] (2, 2), but got (2, 3)

It’s an error to change dtypes:

>>> bad_data2 = np.ones((2, 2, 9), dtype=np.int32) * np.arange(9, dtype=np.int32)
>>> for x in s1.window(bad_data2): print repr(x)
Traceback (most recent call last):
...
ValueError: expected event with dtype float64, but got int32
reset()
window(value)
class onyx.dataflow.streamprocess.SlidingWindowProcessor(window_len, slide_len, sendee=None, sending=True, label=None, bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

A processor for windowing Numpy arrays

window_len and slide_len should be positive integers. Input events must
be Numpy arrays. The dimensionality and dytpe of the each event seen will be checked against those from the first event seen; it’s an error if later events don’t match in both dimension and dtype, except that the final dimension may vary. Note that windowing is done in the last dimension only, so multidimensional arrays sent as out-events will maintain the shape of the input events except in this last dimension, which will always be window_len.
>>> s0 = SlidingWindowProcessor(5, 2, sendee=_printer)
>>> for i in xrange(2):
...   s0.process(np.arange(1, dtype=int))
...   s0.process(np.arange(10,12, dtype=int))
...   s0.process(np.arange(20,23, dtype=int))
...   s0.process(np.arange(30,40, dtype=int))
array([ 0, 10, 11, 20, 21])
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])
array([36, 37, 38, 39,  0])
array([38, 39,  0, 10, 11])
array([ 0, 10, 11, 20, 21])
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])

Here’s an example with multidimensional input. Our initial input is 2x2x9, so output is 2x2x5.

>>> s1 = SlidingWindowProcessor(5, 2, sendee=_printer)
>>> data = np.ones((2, 2, 9), dtype=float) * np.arange(9)
>>> s1.process(data)
array([[[ 0.,  1.,  2.,  3.,  4.],
        [ 0.,  1.,  2.,  3.,  4.]],
<BLANKLINE>
       [[ 0.,  1.,  2.,  3.,  4.],
        [ 0.,  1.,  2.,  3.,  4.]]])
array([[[ 2.,  3.,  4.,  5.,  6.],
        [ 2.,  3.,  4.,  5.,  6.]],
<BLANKLINE>
       [[ 2.,  3.,  4.,  5.,  6.],
        [ 2.,  3.,  4.,  5.,  6.]]])
array([[[ 4.,  5.,  6.,  7.,  8.],
        [ 4.,  5.,  6.,  7.,  8.]],
<BLANKLINE>
       [[ 4.,  5.,  6.,  7.,  8.],
        [ 4.,  5.,  6.,  7.,  8.]]])

It’s an error to change dimensions other than the last:

>>> bad_data1 = np.ones((2, 3, 9), dtype=float) * np.arange(9)
>>> s1.process(bad_data1)
Traceback (most recent call last):
...
ValueError: expected event with shape[:-1] (2, 2), but got (2, 3)

It’s an error to change dtypes:

>>> bad_data2 = np.ones((2, 2, 9), dtype=np.int32) * np.arange(9, dtype=np.int32)
>>> s1.process(bad_data2)
Traceback (most recent call last):
...
ValueError: expected event with dtype float64, but got int32
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.SplitProcessor(sending=True, process_types=(), bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

A processor for fanning out data, i.e. a split. Sendees must be added with add_sendee(). Pushes value of each call to process() to every sendee. Sendees are a set, so each sendee is called only once per process() even if it was added to SplitProcessor via multiple calls to add_sendee().

>>> recipient1 = list()
>>> recipient2 = list()
>>> d = SplitProcessor(process_types=(int, list, np.ndarray))
>>> d.add_sendee(recipient1.append)
>>> r2a = recipient2.append  # note: see commentary in add_sendee()
>>> d.add_sendee(r2a)
>>> d.add_sendee(r2a)
>>> for x in xrange(10): d.process(x)
>>> recipient1
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> recipient2
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Show that non-ndarrays don’t get copied, but ndarrays do get copied

>>> del recipient1[:], recipient2[:]
>>> d.process(range(5))
>>> d.process(np.arange(5))
>>> recipient1[0] is recipient2[0]
True
>>> recipient1[1] is recipient2[1]
False
>>> d.add_sendee(0)
Traceback (most recent call last):
   ...
ValueError: expected a callable sendee, got an instance of int
add_sendee(sendee)

Clients call this to add a callable to the set to which this processor will send its results. Has set semantics based on the id() of sendee; so, as with set objects, adding a sendee more than once does not change the runtime behavior. Sends copies of ndarrays.

dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Not implemented for this processor; use add_sendee()

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.streamprocess.WatchProcessor(watch_label, out_stream=None, sendee=None, sending=True, process_types=(), bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

A debugging processor. Prints a label and the value, then sends the value on down the stream.

>>> recipient = list()
>>> e = WatchProcessor('hoo hoo:', sendee=recipient.append, process_types=int)
>>> for x in xrange(4): e.process(x)
hoo hoo: 0
hoo hoo: 1
hoo hoo: 2
hoo hoo: 3
>>> out = cStringIO.StringIO()
>>> e = WatchProcessor('hoo hoo:', out_stream=out, sendee=recipient.append, process_types=int)
>>> for x in xrange(4): e.process(x)
>>> print out.getvalue()
hoo hoo: 0
hoo hoo: 1
hoo hoo: 2
hoo hoo: 3
<BLANKLINE>
>>> recipient
[0, 1, 2, 3, 0, 1, 2, 3]
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().