Bases: onyx.dataflow.combine_processor.SlotFillingJoinConnectionSpec
T.count(value) -> integer – return number of occurrences of value
T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.
Alias for field number 0
Bases: onyx.dataflow.streamprocess.ProcessorBase
Processor that connects together a lattice of processors.
>>> e = LatticeProcessor.MakeChainProcessor(tuple(streamprocess.EpsilonProcessor() for i in xrange(5)), sendee=_printer)
>>> for x in 'abc': e.process(x)
'a'
'b'
'c'
>>> e.graph
FrozenGraph(GraphTables((('EpsilonProcessor', 'EpsilonProcessor', 'EpsilonProcessor', 'EpsilonProcessor', 'EpsilonProcessor'), (0, 1, 2, 3), (1, 2, 3, 4), (None, None, None, None))))
>>> pn = dict()
>>> pn['splitter'] = streamprocess.SplitProcessor(process_types=int)
>>> pn['eps1'] = streamprocess.EpsilonProcessor()
>>> pn['eps2'] = streamprocess.EpsilonProcessor()
>>> pn['bsfj'] = join.BlockingSlotFillingJoin()
>>> arc_specs = (('spl2eps1', ('splitter', 'eps1', None)),
... ('spl2eps2', ('splitter', 'eps2', None)),
... ('eps12bsfj', ('eps1', 'bsfj', SlotFillingJoinConnectionSpec(tag='e1'))),
... ('eps22bsfj', ('eps2', 'bsfj', SlotFillingJoinConnectionSpec(tag='e2'))))
>>> latproc = LatticeProcessor(pn, arc_specs)
>>> res = list()
>>> latproc.set_sendee(res.append)
>>> latproc.process(1)
>>> latproc.process(2)
>>> print res
[(1, 1), (2, 2)]
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.ProcessorBase
A processor wrapper to extract relevant data specified by in_keys from an input containing a mapping object such as, dict, attrdict, wormattrdict etc., calls the processor being wrapped with a tuple containing the values from the mapping object corresponding to the in_keys. The output of the processor is added to the mapping object with out_key as the key and the returned result as its value.
If in_keys has one entry, then the value corresponding to that key in the mapping object is used to call the processor.
It is necessary for the underlying processor to return exactly one result every time its process method is called.
>>> def adder(a):
... if isinstance(a, int):
... return a + 1
... else:
... return a + 2
>>> res = list()
>>> fp = streamprocess.FunctionProcessor(adder, process_types=(int, complex), bypass_types=float)
>>> wfp = MappingWrapperSisoProcessor(('x',), ('y',), fp, pass_thru_types=complex, sendee=res.append)
>>> data1 = builtin.wormattrdict(x=1)
>>> data2 = dict(x=5.0)
>>> data3 = complex(1, 2)
>>> wfp.process(data1)
>>> wfp.process(data2)
>>> wfp.process(data3)
>>> print res
[wormattrdict({'x': 1, 'y': 2}), {'y': 5.0, 'x': 5.0}, (3+2j)]
Another example showing two inputs and one output. Showing division as an example as it is not associative.
>>> def div_two_numbers(args):
... num, den = args
... return (num // den, num % den)
>>> res = list()
>>> fp = streamprocess.FunctionProcessor(div_two_numbers, process_types=tuple)
>>> wfp = MappingWrapperSisoProcessor(('x', 'y'), ('q', 'r'), fp, sendee=res.append)
>>> data = builtin.attrdict(x=1.0, y=2.0)
>>> wfp.process(data)
>>> print res
[attrdict({'y': 2.0, 'x': 1.0, 'r': 1.0, 'q': 0.0})]
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.streamprocess.SessionProcessorBase
A processor wrapper to extract relevant data specified by in_keys from an input containing a mapping object such as, dict, attrdict, wormattrdict etc., calls the processor being wrapped with a tuple containing the values from the mapping object corresponding to the in_keys. The output of the processor is added to the mapping object with out_key as the key and the returned result as its value.
If in_keys has one entry, then the value corresponding to that key in the mapping object is used to call the processor.
Unlike the MappingWrapperSisoProcessor that requires the underlying processor to return one vaue, this does not. The reason being, rather than adding result to incoming data, a new wormattrdict result is created for each result.
>>> class Spam(streamprocess.SessionProcessorBase):
... def __init__(self, sendee=None, sending=True, bypass_types=()):
... super(Spam, self).__init__(sendee=sendee, sending=sending, process_types=int, bypass_types=bypass_types)
... self._data = []
... def process_data(self, i):
... self._data.append(i)
... self.send(str(i))
... def session_end(self, event):
... self.send(str(self._data))
... self.send(event)
>>> res = list()
>>> spam = Spam()
>>> spmp = SessionProcessorMappingWrapper(('x',), ('y',), ('z',), spam, return_map_type=dict, sendee=res.append)
>>> sb = streamprocess.SessionProcessorBase.SessionBegin(userdata=None)
>>> se = streamprocess.SessionProcessorBase.SessionEnd(userdata=None)
>>> spmp.process(sb)
>>> for i in range(3):
... spmp.process(dict(x=i, z=20))
>>> spmp.process(se)
>>> print res
[SessionBegin(userdata=None), {'y': '0', 'z': 20}, {'y': '1', 'z': 20}, {'y': '2', 'z': 20}, {'y': '[0, 1, 2]', 'z': 20}, SessionEnd(userdata=None)]
Bases: onyx.dataflow.streamprocess.SessionBegin
T.count(value) -> integer – return number of occurrences of value
T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.
Alias for field number 0
Bases: onyx.dataflow.streamprocess.SessionBreak
T.count(value) -> integer – return number of occurrences of value
T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.
Alias for field number 0
Bases: onyx.dataflow.streamprocess.SessionEnd
T.count(value) -> integer – return number of occurrences of value
T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.
Alias for field number 0
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Bases: onyx.dataflow.combine_processor.SlotFillingJoinConnectionSpec
T.count(value) -> integer – return number of occurrences of value
T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.
Alias for field number 0