Onyx logo

Previous topic

onyx.dataflow.cfg – Work with CFG grammars.

Next topic

onyx.dataflow.composecfg – Proof of concept of composing CFGs.

This Page

onyx.dataflow.combine_processor – Creating network of processors and useful processor wrappers.

onyx.dataflow.combine_processor.ChainProcessor(processors, sendee=None, sending=True, bypass_types=())
class onyx.dataflow.combine_processor.GatingJoinConnectionSpec

Bases: onyx.dataflow.combine_processor.SlotFillingJoinConnectionSpec

count

T.count(value) -> integer – return number of occurrences of value

index

T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.

is_gating

Alias for field number 0

class onyx.dataflow.combine_processor.LatticeProcessor(processors_dict, arc_specs, sendee=None, sending=True, bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

Processor that connects together a lattice of processors.

>>> e = LatticeProcessor.MakeChainProcessor(tuple(streamprocess.EpsilonProcessor() for i in xrange(5)), sendee=_printer)
>>> for x in 'abc': e.process(x)
'a'
'b'
'c'
>>> e.graph
FrozenGraph(GraphTables((('EpsilonProcessor', 'EpsilonProcessor', 'EpsilonProcessor', 'EpsilonProcessor', 'EpsilonProcessor'), (0, 1, 2, 3), (1, 2, 3, 4), (None, None, None, None))))
>>> pn = dict()
>>> pn['splitter'] = streamprocess.SplitProcessor(process_types=int)
>>> pn['eps1'] = streamprocess.EpsilonProcessor()
>>> pn['eps2'] = streamprocess.EpsilonProcessor()
>>> pn['bsfj'] = join.BlockingSlotFillingJoin()
>>> arc_specs = (('spl2eps1', ('splitter', 'eps1', None)),
...              ('spl2eps2', ('splitter', 'eps2', None)),
...              ('eps12bsfj', ('eps1', 'bsfj', SlotFillingJoinConnectionSpec(tag='e1'))),
...              ('eps22bsfj', ('eps2', 'bsfj', SlotFillingJoinConnectionSpec(tag='e2'))))
>>> latproc = LatticeProcessor(pn, arc_specs)
>>> res = list()
>>> latproc.set_sendee(res.append)
>>> latproc.process(1)
>>> latproc.process(2)
>>> print res
[(1, 1), (2, 2)]
static MakeChainProcessor(processors, sendee=None, sending=True, bypass_types=())
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

num_processors()
process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.combine_processor.MappingWrapperSisoProcessor(in_keys, out_keys, processor, pass_thru_types=(), sendee=None, sending=True, bypass_types=())

Bases: onyx.dataflow.streamprocess.ProcessorBase

A processor wrapper to extract relevant data specified by in_keys from an input containing a mapping object such as, dict, attrdict, wormattrdict etc., calls the processor being wrapped with a tuple containing the values from the mapping object corresponding to the in_keys. The output of the processor is added to the mapping object with out_key as the key and the returned result as its value.

If in_keys has one entry, then the value corresponding to that key in the mapping object is used to call the processor.

It is necessary for the underlying processor to return exactly one result every time its process method is called.

>>> def adder(a):
...     if isinstance(a, int):
...         return a + 1
...     else:
...         return a + 2
>>> res = list()
>>> fp = streamprocess.FunctionProcessor(adder, process_types=(int, complex), bypass_types=float)
>>> wfp = MappingWrapperSisoProcessor(('x',), ('y',), fp, pass_thru_types=complex, sendee=res.append)
>>> data1 = builtin.wormattrdict(x=1)
>>> data2 = dict(x=5.0)
>>> data3 = complex(1, 2)
>>> wfp.process(data1)
>>> wfp.process(data2)
>>> wfp.process(data3)
>>> print res
[wormattrdict({'x': 1, 'y': 2}), {'y': 5.0, 'x': 5.0}, (3+2j)]

Another example showing two inputs and one output. Showing division as an example as it is not associative.

>>> def div_two_numbers(args):
...     num, den = args
...     return (num // den, num % den)
>>> res = list()
>>> fp = streamprocess.FunctionProcessor(div_two_numbers, process_types=tuple)
>>> wfp = MappingWrapperSisoProcessor(('x', 'y'), ('q', 'r'), fp, sendee=res.append)
>>> data = builtin.attrdict(x=1.0, y=2.0)
>>> wfp.process(data)
>>> print res
[attrdict({'y': 2.0, 'x': 1.0, 'r': 1.0, 'q': 0.0})]
dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

process(event)
send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

sendee

The callable this processor will use to send events; see set_sendee()

sending

Whether this processor will currently send events at all; see set_sending()

set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

set_sending(sending)

Clients call this to turn sending from a processor on or off.

static std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.combine_processor.SessionProcessorMappingWrapper(in_keys, out_keys, const_keys, processor, return_map_type=<class 'onyx.builtin.wormattrdict'>, sendee=None, sending=True, bypass_types=())

Bases: onyx.dataflow.streamprocess.SessionProcessorBase

A processor wrapper to extract relevant data specified by in_keys from an input containing a mapping object such as, dict, attrdict, wormattrdict etc., calls the processor being wrapped with a tuple containing the values from the mapping object corresponding to the in_keys. The output of the processor is added to the mapping object with out_key as the key and the returned result as its value.

If in_keys has one entry, then the value corresponding to that key in the mapping object is used to call the processor.

Unlike the MappingWrapperSisoProcessor that requires the underlying processor to return one vaue, this does not. The reason being, rather than adding result to incoming data, a new wormattrdict result is created for each result.

>>> class Spam(streamprocess.SessionProcessorBase):
...     def __init__(self, sendee=None, sending=True, bypass_types=()):
...         super(Spam, self).__init__(sendee=sendee, sending=sending, process_types=int, bypass_types=bypass_types)
...         self._data = []
...     def process_data(self, i):
...         self._data.append(i)
...         self.send(str(i))
...     def session_end(self, event):
...         self.send(str(self._data))
...         self.send(event)
>>> res = list()
>>> spam = Spam()
>>> spmp = SessionProcessorMappingWrapper(('x',), ('y',), ('z',), spam, return_map_type=dict, sendee=res.append)
>>> sb = streamprocess.SessionProcessorBase.SessionBegin(userdata=None)
>>> se = streamprocess.SessionProcessorBase.SessionEnd(userdata=None)
>>> spmp.process(sb)
>>> for i in range(3):
...     spmp.process(dict(x=i, z=20))
>>> spmp.process(se)
>>> print res
[SessionBegin(userdata=None), {'y': '0', 'z': 20}, {'y': '1', 'z': 20}, {'y': '2', 'z': 20}, {'y': '[0, 1, 2]', 'z': 20}, SessionEnd(userdata=None)]
class SessionBegin

Bases: onyx.dataflow.streamprocess.SessionBegin

count

T.count(value) -> integer – return number of occurrences of value

index

T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.

userdata

Alias for field number 0

class SessionProcessorMappingWrapper.SessionBreak

Bases: onyx.dataflow.streamprocess.SessionBreak

count

T.count(value) -> integer – return number of occurrences of value

index

T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.

userdata

Alias for field number 0

class SessionProcessorMappingWrapper.SessionEnd

Bases: onyx.dataflow.streamprocess.SessionEnd

count

T.count(value) -> integer – return number of occurrences of value

index

T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.

userdata

Alias for field number 0

SessionProcessorMappingWrapper.dc

A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag

SessionProcessorMappingWrapper.debug_tag

Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint

SessionProcessorMappingWrapper.graph

Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.

SessionProcessorMappingWrapper.label

Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().

SessionProcessorMappingWrapper.process(event)
SessionProcessorMappingWrapper.process_data(value)
SessionProcessorMappingWrapper.send(result)

Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.

SessionProcessorMappingWrapper.sendee

The callable this processor will use to send events; see set_sendee()

SessionProcessorMappingWrapper.sending

Whether this processor will currently send events at all; see set_sending()

SessionProcessorMappingWrapper.session_begin(event)
SessionProcessorMappingWrapper.session_break(event)
SessionProcessorMappingWrapper.session_end(event)
SessionProcessorMappingWrapper.set_sendee(sendee)

Clients call this to set up the callable where the processor will send its results.

SessionProcessorMappingWrapper.set_sending(sending)

Clients call this to turn sending from a processor on or off.

static SessionProcessorMappingWrapper.std_process_prologue(process_function)

Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().

class onyx.dataflow.combine_processor.SlotFillingJoinConnectionSpec

Bases: onyx.dataflow.combine_processor.SlotFillingJoinConnectionSpec

count

T.count(value) -> integer – return number of occurrences of value

index

T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.

tag

Alias for field number 0