Bases: onyx.dataflow.streamprocess.ProcessorBase
A processor for reshaping a Numpy ndarray.
input_shape should be a shape object of a Numpy ndarray, and output_shape should be a shape object of a Numpy ndarray. Incoming events must be numpy.ndarrays of input_shape as the shape. The output event will be a numpy.ndarray of output_shape shape.
>>> result = list()
>>> reshaper = ArrayReshaper((2, 5, 4), (2, 20), sendee=result.append)
>>> reshaper.process(np.arange(2*5*4).reshape((2, 5, 4)))
>>> result
[array([[ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19],
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36,
37, 38, 39]])]
>>> reshaper.process(np.arange(2*4*3).reshape((2, 4, 3)))
Traceback (most recent call last):
...
ValueError: expected input data shape to be (2, 5, 4), got (2, 4, 3)
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
A processor for selecting some elements from a Numpy array.
selector should be a Numpy array with dtype Boolean and a shape of (len,); i.e., it must be one-dimensional. Incoming events must have the len as the dimension along the specified axis. The output event will always have nearly the same shape as the input, except that the length of the specified axis will be the number of True elements in the selector.
axis is the index of the axis along which incoming events will be sliced.
>>> array34 = np.arange(12).reshape(3, 4)
>>> array34
array([[ 0, 1, 2, 3],
[ 4, 5, 6, 7],
[ 8, 9, 10, 11]])
>>> sel = np.array([True, False, True])
>>> result = list()
>>> as0 = ArraySelector(sel, axis=0, sendee=result.append)
>>> as0.process(array34)
>>> result[0]
array([[ 0, 1, 2, 3],
[ 8, 9, 10, 11]])
>>> sel = np.array([True, False, True, False])
>>> result = list()
>>> as1 = ArraySelector(sel, axis=1, sendee=result.append)
>>> as1.process(array34)
>>> result[0]
array([[ 0, 2],
[ 4, 6],
[ 8, 10]])
>>> as1.process(array34.reshape(4, 3))
Traceback (most recent call last):
...
ValueError: expected axis dimension to be 3, got 4
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase, list
A collecting processor, a subclass of list. Collects the values as well as passing them on. Useful for collecting elements in the middle of a chain.
This example doesn’t really do the collect processor justice because it puts the collector at the end of the chain. See usage in main() for the more useful case of collecting values from somewhere in the middle of a chain.
>>> recipient = list()
>>> e = CollectProcessor(sendee=recipient.append, process_types=int)
>>> for x in xrange(4): e.process(x)
>>> recipient
[0, 1, 2, 3]
>>> e
[0, 1, 2, 3]
L.append(object) – append object to end
L.count(value) -> integer – return number of occurrences of value
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
L.extend(iterable) – extend list by appending elements from the iterable
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
L.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.
L.insert(index, object) – insert object before index
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
L.pop([index]) -> item – remove and return item at index (default last). Raises IndexError if list is empty or index is out of range.
L.remove(value) – remove first occurrence of value. Raises ValueError if the value is not present.
L.reverse() – reverse IN PLACE
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
L.sort(cmp=None, key=None, reverse=False) – stable sort IN PLACE; cmp(x, y) -> -1, 0, 1
Bases: onyx.dataflow.streamprocess.ProcessorBase
A decimating processor. Pushes value of every nth call to process(), starting with the value of the first call to process().
>>> recipient = list()
>>> d = DecimatingProcessor(2, sendee=recipient.append, process_types=int)
>>> for x in xrange(10): d.process(x)
>>> recipient
[0, 2, 4, 6, 8]
>>> d = DecimatingProcessor(3, process_types=int)
>>> d.set_sendee(recipient.append)
>>> for x in xrange(10): d.process(x)
>>> recipient
[0, 2, 4, 6, 8, 0, 3, 6, 9]
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
Wraps an existing processor and processes two element tuples where the second element is input to the wrapped processor and the first element is a dictionary used to determine whether the processor is run or if the wrapped process acts like an epsilon processor that passes the input directly to the sendee (bypassing the wrapped processor). The output sent to the sendee can either be a tuple containing the selector dictionary or the direct output from the wrapped processor based on the initialization parameters. This class allows a processor in a processing chain to be easiliy skipped at runtime without dynamically building processor chains or adding a “doNothing” parameter to the init parameters of processors in a way that would decrease their reusability.
The wrapped processor will be executed only if the slector map (first item of the input tuple) contains a True value under the key specified by the init parameter selector_key or if selector_key is None. If pass_dict_to_sendee is True, then the output sent to the sendee will be a 2-item tuple where the first item is the selector dictionary, otherwise the selector dictionary will not be passed down the processing chain. Note that inputs matching bypass_types will bypass this wrapper and the wrapped processor. This Processor does not support wrapping processors that send multiple events for an input event when pass_dict_to_sendee is set to True because the output cannot be sent down the processing chain as the second element in a tuple.
Set up a test for all use cases:
>>> def append_world(value):
... return value + ' world'
>>> add_world_proc = FunctionProcessor(append_world, process_types=(str,))
>>> selector_dict = {'add_key':True, 'no_add_key':False}
>>> output_sink = []
Test non-skipped processor, returning a tuple with the selector dict:
>>> DictSelectableProcessor(processor=add_world_proc, selector_key='add_key', pass_dict_to_sendee=True, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
({'no_add_key': False, 'add_key': True}, 'hello world')
Test non-skipped processor by a wildcard (None) key value, returning a tuple with the selector dict:
>>> DictSelectableProcessor(processor=add_world_proc, selector_key=None, pass_dict_to_sendee=True, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
({'no_add_key': False, 'add_key': True}, 'hello world')
Test non-skipped processor, not returning a tuple with the selector dict:
>>> DictSelectableProcessor(processor=add_world_proc, selector_key='add_key', pass_dict_to_sendee=False, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
hello world
Test non-skipped processor by a wildcard (None) key value, not returning a tuple with the selector dict:
>>> DictSelectableProcessor(processor=add_world_proc, selector_key=None, pass_dict_to_sendee=False, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
hello world
Test skipped processor by False value in the selector dict, returning a tuple with the selector map:
>>> DictSelectableProcessor(processor=add_world_proc, selector_key='no_add_key', pass_dict_to_sendee=True, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
({'no_add_key': False, 'add_key': True}, 'hello')
Test skipped processor by missing key value in the selector dict, returning a tuple with the selector map:
>>> DictSelectableProcessor(processor=add_world_proc, selector_key='missing_key', pass_dict_to_sendee=True, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
({'no_add_key': False, 'add_key': True}, 'hello')
Test skipped processor by False value in the selector dict, not returning a tuple with the selector map:
>>> DictSelectableProcessor(processor=add_world_proc, selector_key='no_add_key', pass_dict_to_sendee=False, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
hello
Test skipped processor by missing key value in the selector dict, not returning a tuple with the selector map:
>>> DictSelectableProcessor(processor=add_world_proc, selector_key='missing_key', pass_dict_to_sendee=False, sendee=output_sink.append).process((selector_dict, 'hello'))
>>> print output_sink[-1]
hello
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
A processor for filtering events out of a dataflow by type.
filter_types is a type or tuple of types or such tuples, exactly as used in Python’s builtin isinstance function. Any event e for which isinstance(e, filter_types) would return True is filtered out, that is, the event is dropped from the dataflow. All other events are passed.
See also PassByTypeFilter
>>> collect = list()
>>> fp0 = DropByTypeFilter(sendee=collect.append, filter_types=(list, dict))
>>> fp0.process(1)
>>> fp0.process('foo')
>>> fp0.process(3.415)
>>> fp0.process([])
>>> fp0.process({})
>>> fp0.process(None)
>>> fp0.process(())
>>> collect
[1, 'foo', 3.415, None, ()]
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.NoProcessProcessor
An epsilon processor. It passes through all values sent in.
An example where a user-defined function is the sendee; and in this case that function just prints out the value.
>>> e = EpsilonProcessor()
>>> e.set_sendee(_printer)
>>> e.process('abc')
'abc'
>>> with debugprint.DebugPrint("df_node_EpsilonProcessor_debug_send"):
... e.process('def')
df_node_EpsilonProcessor_debug_send: Event of type <type 'str'>
'def'
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
Apply a callable to the argument to process(), send the result.
function must be a callable. If call_with_dc is False, it should be take one argument, which is the input event for the processor. If call_with_dc is True, then function should be a callable taking two arguments, the second of which will be a debugging context like that returned by dcheck() in the debugprint module. If pre_hook is not None, it should be a callable which will be called with the in_coming event before the function is called. If post_hook is not None, it should be a callable which will be called with the out_going event before it is sent. Any return values from pre_hook or post_hook is ignored.
>>> floor0 = FunctionProcessor(int, sendee=_printer, process_types=(float,),
... bypass_types=str)
>>> for x in xrange(7): floor0.process(-0.825 * x)
0
0
-1
-2
-3
-4
-4
>>> floor0.process('hi')
'hi'
>>> floor0.process(None)
Traceback (most recent call last):
...
ValueError: processor type FunctionProcessor expected event type in one of these two groups: (<type 'float'>,) (<type 'str'>,), but got <type 'NoneType'>
>>> def pre_print(x): print('pre: %s' % (x,))
>>> def post_print(x): print('post: %s' % (x,))
>>> floor1 = FunctionProcessor(int, sendee=_printer, process_types=(float,),
... bypass_types=str,
... pre_hook=pre_print, post_hook=post_print)
>>> for x in xrange(3): floor1.process(-0.825 * x)
pre: -0.0
post: 0
0
pre: -0.825
post: 0
0
pre: -1.65
post: -1
-1
>>> def int_with_debug(value, dc):
... dc and dc("converting value %s" % (value,))
... return int(value)
>>> floor2 = FunctionProcessor(int_with_debug, sendee=_printer, process_types=(float,),
... call_with_dc=True, bypass_types=str, label='IWD')
>>> with debugprint.DebugPrint("df_node_IWD_debug"):
... for x in xrange(3): floor2.process(-0.825 * x)
df_node_IWD_debug: converting value -0.0
0
df_node_IWD_debug: converting value -0.825
0
df_node_IWD_debug: converting value -1.65
-1
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
A specialization of ProcessorBase that only takes a bypass_types argument so it never calls its process function.
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.NoProcessProcessor
A null processor. This can be used as a Processor placeholder, but it will error if anything is sent to it.
>>> e = NullProcessor()
>>> e.set_sendee(_printer)
>>> e.process('abc')
Traceback (most recent call last):
...
ValueError: processor type NullProcessor expected event type in one of these two groups: () (), but got <type 'str'>
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
A processor for filtering events out of a dataflow by type.
filter_types is a type or tuple of types or such tuples, exactly as used in Python’s builtin isinstance function. Any event e for which isinstance(e, filter_types)<isinstance() would return True is passed. All other events are filtered out, that is, dropped from the dataflow.
See also DropByTypeFilter
>>> collect = list()
>>> fp0 = PassByTypeFilter(sendee=collect.append, filter_types=(list, dict))
>>> fp0.process(1)
>>> fp0.process('foo')
>>> fp0.process(3.415)
>>> fp0.process([])
>>> fp0.process({})
>>> fp0.process(None)
>>> fp0.process(())
>>> collect
[[], {}]
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: object
Baseclass for processor elements in a chain. Subclasses must implement the function process() to do work based on value, and to call send(result) when the work produces a result to push on down the chain. Subclasses may override or pass through process_types and bypass_types, which may be types of (nested) tuples of types; see the builtin function isinstance(). For subclasses using the standard_process_prologue() decorator, any incoming event in process_types will be processed, any incoming event in bypass_types will be sent to the next element in the dataflow without processing, and any other type will be an error. Considered as sets of types, process_types and bypass_types must be disjoint, including via subclassing.
>>> a = ProcessorBase(sendee=_printer)
>>> a.process(None)
Traceback (most recent call last):
...
NotImplementedError: ProcessorBase must implement self.process(value) as a method or attribute
>>> b = ProcessorBase(sendee=_printer, process_types=dict, bypass_types=(list, defaultdict))
Traceback (most recent call last):
...
ValueError: expected process_types and bypass_types to be disjoint, got (<type 'dict'>,) and (<type 'list'>, <type 'collections.defaultdict'>)
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Subclasses must override the process(value) function.
The function accepts a single value. If, after processing the single value, it has a new result or results, the function must call send(result) for each result that is calculated. For a given call to process(), the function can call send() zero, one, or multiple times depending on the semantics that the processor type implements.
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.SlidingWindowProcessor
A processor that behaves like SlidingWindowProcessor, except that a reset_type can be specified which resets the state to the initial windowing state. When an object of type reset_type is processed, any window that has fewer than win_len elements will not be output and processing will continue with an empty window.
>>> reset_type = type(None)
>>> bypass = float
>>> s0=ResettableSlidingWindowProcessor(reset_type, True, 5, 2, sendee=_printer, bypass_types=bypass)
>>> for i in xrange(2):
... s0.process(np.arange(1, dtype=int))
... s0.process(np.arange(10, 12, dtype=int))
... s0.process(np.arange(20, 23, dtype=int))
... s0.process(10.0)
... s0.process(np.arange(30, 40, dtype=int))
... s0.process(None)
array([ 0, 10, 11, 20, 21])
10.0
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])
None
array([ 0, 10, 11, 20, 21])
10.0
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])
None
Here’s an example with multidimensional input. Our initial input is 2x2x9, so output is 2x2x5.
>>> s1 = ResettableSlidingWindowProcessor(type(None), True, 5, 2, sendee=_printer)
>>> data = np.ones((2, 2, 9), dtype=float) * np.arange(9)
>>> s1.process(data)
array([[[ 0., 1., 2., 3., 4.],
[ 0., 1., 2., 3., 4.]],
<BLANKLINE>
[[ 0., 1., 2., 3., 4.],
[ 0., 1., 2., 3., 4.]]])
array([[[ 2., 3., 4., 5., 6.],
[ 2., 3., 4., 5., 6.]],
<BLANKLINE>
[[ 2., 3., 4., 5., 6.],
[ 2., 3., 4., 5., 6.]]])
array([[[ 4., 5., 6., 7., 8.],
[ 4., 5., 6., 7., 8.]],
<BLANKLINE>
[[ 4., 5., 6., 7., 8.],
[ 4., 5., 6., 7., 8.]]])
It’s an error to change dimensions other than the last:
>>> bad_data1 = np.ones((2, 3, 9), dtype=float) * np.arange(9)
>>> s1.process(bad_data1)
Traceback (most recent call last):
...
ValueError: expected event with shape[:-1] (2, 2), but got (2, 3)
It’s an error to change dtypes:
>>> bad_data2 = np.ones((2, 2, 9), dtype=np.int32) * np.arange(9, dtype=np.int32)
>>> s1.process(bad_data2)
Traceback (most recent call last):
...
ValueError: expected event with dtype float64, but got int32
It’s an error to use a reset_type as one of the process_types. ResettableSlidingWindowProcessor has process_types == np.ndarray from its parent class by default.
>>> ResettableSlidingWindowProcessor(np.ndarray, True, 10, 20)
Traceback (most recent call last):
...
TypeError: reset_type must not be ndarray
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
Apply a callable to the argument to process(); the callable returns a sequence or generator; send each item in the returned sequence.
function must be a callable. An incoming event is first checked for being an instance of a type in call_types (in the Python isinstance() sense), if so, function is called on the incoming event and the return value is sent as an output event. Second, an incoming event which is an instance of a type in bypass_types is immediately sent as an output event with no call to function. An event which doesn’t satisfy any of these conditions causes a ValueError to be raised.
>>> c = SequenceFunctionProcessor(lambda x: [x] * abs(x), sendee=_printer, process_types=(int,),
... bypass_types=(str,))
>>> for x in xrange(3, -4, -1): c.process(x)
3
3
3
2
2
1
-1
-2
-2
-3
-3
-3
>>> c.process('hi')
'hi'
>>> c.process(None)
Traceback (most recent call last):
...
ValueError: processor type SequenceFunctionProcessor expected event type in one of these two groups: (<type 'int'>,) (<type 'str'>,), but got <type 'NoneType'>
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
Baseclass for processor elements that expect to process data in sessions. All data events must occur between SessionProcessorBase.SessionBegin and SessionProcessorBase.SessionEnd events. A SessionProcessorBase.SessionBreak event specifies the point where adjacent data events are not contiguous. For example, in a Language Modeling context this could indicate that context information across data events adjoining the SessionProcessorBase.SessionBreak event will not be used. SessionProcessorBase.SessionBreak events must also occur between SessionProcessorBase.SessionBegin and SessionProcessorBase.SessionEnd.
Derived classes must implement SessionProcessorBase.process_data method. They can override the default implementation of SessionProcessorBase.session_begin, SessionProcessorBase.session_break, and SessionProcessorBase.session_end
>>> class BigramCounter(SessionProcessorBase):
... UNKNOWN_CONTEXT = '!unk'
... def __init__(self, sendee=None, sending=True, label=None, bypass_types=()):
... super(BigramCounter, self).__init__(
... sendee, sending=sending, process_types=str,
... bypass_types=bypass_types)
... self._prev_word = self.UNKNOWN_CONTEXT
... self._word_counts = collections.defaultdict(int)
... def session_break(self, event):
... self._prev_word = self.UNKNOWN_CONTEXT
... def session_end(self, event):
... self._prev_word = self.UNKNOWN_CONTEXT
... self.send(self._word_counts)
... self._word_counts = collections.defaultdict(int)
... super(BigramCounter, self).session_end(event)
... def process_data(self, word):
... self._word_counts[(self._prev_word, word)] += 1
... self._prev_word = word
>>> res = list()
>>> bc = BigramCounter(sendee=res.append)
>>> bc.process(bc.SessionBegin(userdata=None))
>>> print res
[SessionBegin(userdata=None)]
>>> del res[:]
>>> for word in "Tortilla soup".split():
... bc.process(word)
>>> len(res)
0
>>> bc.process(bc.SessionBreak(userdata=None))
>>> len(res)
0
Process contiguous data divided as two chunks. This should be the same as processing them as a single chunk.
>>> for word in "Eat drink".split():
... bc.process(word)
>>> for word in "man woman".split():
... bc.process(word)
>>> assert len(res) == 0
>>> expected_result = dict({('!unk', 'Tortilla'): 1, ('Tortilla', 'soup'): 1,
... ('!unk', 'Eat'): 1, ('Eat', 'drink'): 1, ('drink', 'man'): 1,
... ('man', 'woman'): 1})
>>> bc.process(bc.SessionEnd(userdata=None))
>>> len(res)
2
>>> res[0] == expected_result
True
>>> print res[1]
SessionEnd(userdata=None)
Bases: onyx.dataflow.streamprocess.SessionBegin
T.count(value) -> integer – return number of occurrences of value
T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.
Alias for field number 0
Bases: onyx.dataflow.streamprocess.SessionBreak
T.count(value) -> integer – return number of occurrences of value
T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.
Alias for field number 0
Bases: onyx.dataflow.streamprocess.SessionEnd
T.count(value) -> integer – return number of occurrences of value
T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.
Alias for field number 0
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Process non-session events (data events). Derived classes must implement this method.
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Default behavior when a SessionBegin event is received.
Default behavior when a SessionBreak event is received.
Default behavior when a SessionEnd event is received.
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: object
A processor for windowing Numpy arrays
>>> s0 = SlidingWindow(5, 2)
>>> res = list()
>>> for i in xrange(2):
... for x in s0.window(np.arange(1, dtype=int)): res.append(x)
... for x in s0.window(np.arange(10,12, dtype=int)): res.append(x)
... for x in s0.window(np.arange(20,23, dtype=int)): res.append(x)
... for x in s0.window(np.arange(30,40, dtype=int)): res.append(x)
>>> for r in res: print repr(r)
array([ 0, 10, 11, 20, 21])
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])
array([36, 37, 38, 39, 0])
array([38, 39, 0, 10, 11])
array([ 0, 10, 11, 20, 21])
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])
Here’s an example with multidimensional input. Our initial input is 2x2x9, so output is 2x2x5.
>>> s1 = SlidingWindow(5, 2)
>>> data = np.ones((2, 2, 9), dtype=float) * np.arange(9)
>>> for x in (s1.window(data)): print repr(x)
array([[[ 0., 1., 2., 3., 4.],
[ 0., 1., 2., 3., 4.]],
<BLANKLINE>
[[ 0., 1., 2., 3., 4.],
[ 0., 1., 2., 3., 4.]]])
array([[[ 2., 3., 4., 5., 6.],
[ 2., 3., 4., 5., 6.]],
<BLANKLINE>
[[ 2., 3., 4., 5., 6.],
[ 2., 3., 4., 5., 6.]]])
array([[[ 4., 5., 6., 7., 8.],
[ 4., 5., 6., 7., 8.]],
<BLANKLINE>
[[ 4., 5., 6., 7., 8.],
[ 4., 5., 6., 7., 8.]]])
It’s an error to change dimensions other than the last:
>>> bad_data1 = np.ones((2, 3, 9), dtype=float) * np.arange(9)
>>> for x in s1.window(bad_data1): print repr(x)
Traceback (most recent call last):
...
ValueError: expected event with shape[:-1] (2, 2), but got (2, 3)
It’s an error to change dtypes:
>>> bad_data2 = np.ones((2, 2, 9), dtype=np.int32) * np.arange(9, dtype=np.int32)
>>> for x in s1.window(bad_data2): print repr(x)
Traceback (most recent call last):
...
ValueError: expected event with dtype float64, but got int32
Bases: onyx.dataflow.streamprocess.ProcessorBase
A processor for windowing Numpy arrays
>>> s0 = SlidingWindowProcessor(5, 2, sendee=_printer)
>>> for i in xrange(2):
... s0.process(np.arange(1, dtype=int))
... s0.process(np.arange(10,12, dtype=int))
... s0.process(np.arange(20,23, dtype=int))
... s0.process(np.arange(30,40, dtype=int))
array([ 0, 10, 11, 20, 21])
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])
array([36, 37, 38, 39, 0])
array([38, 39, 0, 10, 11])
array([ 0, 10, 11, 20, 21])
array([11, 20, 21, 22, 30])
array([21, 22, 30, 31, 32])
array([30, 31, 32, 33, 34])
array([32, 33, 34, 35, 36])
array([34, 35, 36, 37, 38])
Here’s an example with multidimensional input. Our initial input is 2x2x9, so output is 2x2x5.
>>> s1 = SlidingWindowProcessor(5, 2, sendee=_printer)
>>> data = np.ones((2, 2, 9), dtype=float) * np.arange(9)
>>> s1.process(data)
array([[[ 0., 1., 2., 3., 4.],
[ 0., 1., 2., 3., 4.]],
<BLANKLINE>
[[ 0., 1., 2., 3., 4.],
[ 0., 1., 2., 3., 4.]]])
array([[[ 2., 3., 4., 5., 6.],
[ 2., 3., 4., 5., 6.]],
<BLANKLINE>
[[ 2., 3., 4., 5., 6.],
[ 2., 3., 4., 5., 6.]]])
array([[[ 4., 5., 6., 7., 8.],
[ 4., 5., 6., 7., 8.]],
<BLANKLINE>
[[ 4., 5., 6., 7., 8.],
[ 4., 5., 6., 7., 8.]]])
It’s an error to change dimensions other than the last:
>>> bad_data1 = np.ones((2, 3, 9), dtype=float) * np.arange(9)
>>> s1.process(bad_data1)
Traceback (most recent call last):
...
ValueError: expected event with shape[:-1] (2, 2), but got (2, 3)
It’s an error to change dtypes:
>>> bad_data2 = np.ones((2, 2, 9), dtype=np.int32) * np.arange(9, dtype=np.int32)
>>> s1.process(bad_data2)
Traceback (most recent call last):
...
ValueError: expected event with dtype float64, but got int32
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
A processor for fanning out data, i.e. a split. Sendees must be added with add_sendee(). Pushes value of each call to process() to every sendee. Sendees are a set, so each sendee is called only once per process() even if it was added to SplitProcessor via multiple calls to add_sendee().
>>> recipient1 = list()
>>> recipient2 = list()
>>> d = SplitProcessor(process_types=(int, list, np.ndarray))
>>> d.add_sendee(recipient1.append)
>>> r2a = recipient2.append # note: see commentary in add_sendee()
>>> d.add_sendee(r2a)
>>> d.add_sendee(r2a)
>>> for x in xrange(10): d.process(x)
>>> recipient1
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> recipient2
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Show that non-ndarrays don’t get copied, but ndarrays do get copied
>>> del recipient1[:], recipient2[:]
>>> d.process(range(5))
>>> d.process(np.arange(5))
>>> recipient1[0] is recipient2[0]
True
>>> recipient1[1] is recipient2[1]
False
>>> d.add_sendee(0)
Traceback (most recent call last):
...
ValueError: expected a callable sendee, got an instance of int
Clients call this to add a callable to the set to which this processor will send its results. Has set semantics based on the id() of sendee; so, as with set objects, adding a sendee more than once does not change the runtime behavior. Sends copies of ndarrays.
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Not implemented for this processor; use add_sendee()
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
A debugging processor. Prints a label and the value, then sends the value on down the stream.
>>> recipient = list()
>>> e = WatchProcessor('hoo hoo:', sendee=recipient.append, process_types=int)
>>> for x in xrange(4): e.process(x)
hoo hoo: 0
hoo hoo: 1
hoo hoo: 2
hoo hoo: 3
>>> out = cStringIO.StringIO()
>>> e = WatchProcessor('hoo hoo:', out_stream=out, sendee=recipient.append, process_types=int)
>>> for x in xrange(4): e.process(x)
>>> print out.getvalue()
hoo hoo: 0
hoo hoo: 1
hoo hoo: 2
hoo hoo: 3
<BLANKLINE>
>>> recipient
[0, 1, 2, 3, 0, 1, 2, 3]
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.