Onyx logo

Previous topic

onyx.dataflow.composecfg – Proof of concept of composing CFGs.

Next topic

onyx.dataflow.simplecfg – Tools for working with CFG grammars.

This Page

onyx.dataflow.graph – Nestable graph-based dataflow support.

class onyx.dataflow.graph.Composite(**kwargs)

Bases: onyx.dataflow.graph.Primitive

Mix-in baseclass exposing fixed properties for concept of composite versus primitive.

>>> Composite().is_composite
True
>>> Composite().is_primitive
False
is_composite
is_primitive
class onyx.dataflow.graph.DataflowArc(source_node, target_node, arc_spec)

Bases: object

An arc in a dataflow network.

A DataflowArc is a pair of port specifiers, the from_port and the to_port. These are keys that identify the particular out_port of the source node and the in_port of the target node.

arc_spec
pass_through(item)
class onyx.dataflow.graph.Join(_)

Bases: object

Obsolete, empty class, used by objectset for its testing.

>>> Join(None) 
<__main__.Join object at 0x...>
class onyx.dataflow.graph.Joiner(label=None)

Bases: onyx.dataflow.graph.ProcessorNode, onyx.dataflow.graph.MultiIn, onyx.dataflow.graph.SingleOut

Undifferentiated serializing joiner. Each input item is sent to the single target in the order it is received.

check_managed(status)
get_inport(inport_label)
get_outtarget(outport_label)
is_composite
is_managed
is_primitive
iter_targets
label
manager
num_inports
num_outtargets
only_outtarget
set_inport(inport_label, intarget)
set_managed(manager)
set_only_outtarget(target)
set_outtarget(outport_label, target)
typed_label
class onyx.dataflow.graph.Labeled(**kwargs)

Bases: onyx.dataflow.graph.object_base

Mix-in baseclass for objects that get constructed with a read-only label.

label
typed_label
class onyx.dataflow.graph.Managed(**kwargs)

Bases: onyx.dataflow.graph.object_base

Mix-in baseclass exposing a simple API for the concept of the manager of an object.

check_managed(status)
is_managed
manager
set_managed(manager)
class onyx.dataflow.graph.MultiIn(**kwargs)

Bases: onyx.dataflow.graph.object_base

Mix-in baseclass for handling the set of inports to a processor node.

get_inport(inport_label)
num_inports
set_inport(inport_label, intarget)
class onyx.dataflow.graph.MultiOut(**kwargs)

Bases: onyx.dataflow.graph.object_base

Mix-in baseclass for handling the set of output targets of a processor node.

get_outtarget(outport_label)
iter_targets
num_outtargets
set_outtarget(outport_label, target)
class onyx.dataflow.graph.Primitive(**kwargs)

Bases: onyx.dataflow.graph.object_base

Mix-in baseclass exposing properties for concept of primitive versus composite.

>>> Primitive().is_primitive
True
>>> Primitive().is_composite
False
is_composite
is_primitive
class onyx.dataflow.graph.ProcessorGraphBuilder(*args)

Bases: object

Builder for processor graphs.

A processor graph is a directed acyclic graph of primitive SisoProcessorNodes. Each node implements processing logic, the graph implements the connections between the nodes.

>>> pgb = ProcessorGraphBuilder()
>>> node1 = SisoMethodProcessorNode('node1')
>>> n1 = pgb.new_node(node1)
>>> n2 = pgb.new_node(SisoMethodProcessorNode('node2'))
>>> pgb.connect(n1, n2)
>>> p1 = pgb.processor_nodes[n1].get_only_inport()
>>> res = list()
>>> pgb.processor_nodes[n2].set_only_outtarget(res.append)
>>> p1(True); p1(False); p1(None)
>>> res
[True, False, None]
>>> n3 = pgb.new_node(node1)
Traceback (most recent call last):
  ...
ValueError: expected SisoMethodProcessorNode labeled 'node1' to be unmanaged, but it is managed by a ProcessorGraphBuilder
>>> pgb.connect(n1, 1000)
Traceback (most recent call last):
  ...
ValueError: expected 0 <= start_node, end_node < 2, but got start_node = 0 and end_node = 1000
>>> pgb.connect(pgb.new_node(SisoMethodProcessorNode('node3')), n2)
>>> p1(1)
Traceback (most recent call last):
  ...
ValueError: node labeled 'node2' expected to be called with inport_id 1, but got 0, suggesting that the node was used as the target in more than one call to ProcessorGraphBuilder.connect() and the source from an earlier call to connect() is still being used
connect(source_id, target_id, arc_ports_spec=((None, ), (None, )))

Connect the output of the node identified by source_id to the input of the node identified by target_id. Optional arc_ports_spec is a pair specifying the output and input ports of the source and target nodes respectively. Update the graph to reflect this connection.

freeze(label=None)

Freezes the builder and returns a pair, (nodes, graph), where nodes is a tuple of the ProcessorNodes that make up the processor network, and graph is a FrozenGraph representing the structure of the network, where the label on each node in the graph is the index of the corresponding ProcessorNode in the nodes tuple.

The ProcessorNodes in the nodes tuple are (already) internally linked so as to implement the network’s functionality. Since these nodes are in general mutable objects that cannot be deepcopied, the instance relinquishes ownership of the nodes and marks itself as frozen. As such, it cannot be used for any futher work. See SisoProcessorGraphNode, which

Raises ValueError if the graph does not satisfy the requirements that it be a lattice with no self loops.

frozen

A bool indicating whether the builder has been frozen and is thus useless. If False the builder is still usable. If True the builder cannot be used and attempts to do so will raise ValueError.

new_node(node)

Add node, a ProcessorNode, to the graph. If node.is_primitive is True the node will be added to graph. Otherwise, node.is_composite must be True, and in this case the network structure and contained primitive nodes in node will be inserted into the graph as a subgraph. The node reqlinquishes control of the primitive nodes and becomes useless.

Returns an identifier for the primitive node or the subgraph.

class onyx.dataflow.graph.ProcessorNode(**kwargs)

Bases: onyx.dataflow.graph.Labeled, onyx.dataflow.graph.Managed, onyx.dataflow.graph.Primitive

Baseclass for processor nodes.

Implements functionality that all processor nodes must have. At present this means they can be labeled via a constructor argument, label=’some string’, they can point to a manager, and they are primitive.

check_managed(status)
is_composite
is_managed
is_primitive
label
manager
set_managed(manager)
typed_label
class onyx.dataflow.graph.SingleIn(**kwargs)

Bases: onyx.dataflow.graph.MultiIn

Mix-in baseclass exposing an API for Single-In processor functionality.

get_inport(inport_label)
get_only_inport()
num_inports
set_inport(inport_label, intarget)
set_only_inport(only_intarget)
class onyx.dataflow.graph.SingleOut(**kwargs)

Bases: onyx.dataflow.graph.MultiOut

Mix-in baseclass exposing an API for Single-Out processor functionality.

get_outtarget(outport_label)
iter_targets
num_outtargets
only_outtarget
set_only_outtarget(target)
set_outtarget(outport_label, target)
onyx.dataflow.graph.SisoChainProcessor(siso_node_iterable, label=None)

Make a siso processor that chains together the siso processors making up siso_node_iterable.

>>> chain1 = SisoChainProcessor((SisoMethodProcessorNode('node_%02d' % (index,)) for index in xrange(50)), 'chain1')
>>> p1 = chain1.get_only_inport()
>>> res1 = list()
>>> chain1.set_only_outtarget(res1.append)
>>> p1(True); p1(False); p1(None); p1('a')
>>> res1
[True, False, None, 'a']

Demonstrate nesting ability of the SisoProcessorGraphNode that gets returned

>>> numchains, numnodes = 10, 10
>>> chains = (SisoChainProcessor((SisoMethodProcessorNode('node%02d_%02d' % (chain, node,)) for node in xrange(numnodes)), 'chain%d' % (chain,)) for chain in xrange(numchains))
>>> nested = SisoChainProcessor(chains, 'nested%02d_%02d' % (numchains, numnodes))
>>> nested.label
'nested10_10'
>>> nested.typed_label
"SisoProcessorGraphNode('nested10_10')"
>>> p2 = nested.get_only_inport()
>>> res2 = list()
>>> nested.set_only_outtarget(res2.append)
>>> p2('a');p2('b');p2(123)
>>> res2
['a', 'b', 123]
>>> # nested.subgraph_tree.dot_display()
>>> # nested.dot_display(graph_attributes=('label="%s"' % (nested.label,), 'labelloc=top;', 'rankdir=LR;'), node_label_callback=lambda label, x, y: nested.processor_nodes[label].label)
class onyx.dataflow.graph.SisoMethodProcessorNode(label=None)

Bases: onyx.dataflow.graph.SisoProcessorNode

Baseclass for processor nodes that are implemented by overriding the process() method to implement processing logic.

check_managed(status)
get_inport(inport_label)
get_only_inport()
get_outtarget(outport_label)
is_composite
is_managed
is_primitive
iter_targets
label
manager
num_inports
num_outtargets
only_outtarget
process(item)

Baseclass implements pass-through behavior; it’s an epsilon node.

set_inport(inport_label, intarget)
set_managed(manager)
set_only_inport(only_intarget)
set_only_outtarget(target)
set_outtarget(outport_label, target)
typed_label
onyx.dataflow.graph.SisoParallelProcessor(siso_node_iterable, label=None)

Makes a siso processor that implements a parallel graph of the siso processors making up siso_node_iterable. The output of the processor is serialized from each processor in the parallel graph. This means the amount of data in the network is multiplied.

>>> parallel1 = SisoParallelProcessor((SisoMethodProcessorNode('node_%02d' % (index,)) for index in xrange(3)), 'parallel1')
>>> p1 = parallel1.get_only_inport()
>>> res1 = list()
>>> parallel1.set_only_outtarget(res1.append)
>>> p1(True); p1(False); p1(None); p1('a')
>>> res1
[True, True, True, False, False, False, None, None, None, 'a', 'a', 'a']

Demonstrate nesting ability of the SisoProcessorGraphNode that gets returned, and the data-multiplying effect

>>> numchains, numnodes = 2, 3
>>> chains = (SisoParallelProcessor((SisoMethodProcessorNode('node%02d_%02d' % (chain, node,)) for node in xrange(numnodes)), 'chain%d' % (chain,)) for chain in xrange(numchains))
>>> nested = SisoParallelProcessor(chains, 'nested_%02d_%02d' % (numchains, numnodes))
>>> nested.label
'nested_02_03'
>>> nested.typed_label
"SisoProcessorGraphNode('nested_02_03')"
>>> p2 = nested.get_only_inport()
>>> res2 = list()
>>> nested.set_only_outtarget(res2.append)
>>> p2('a');p2('b');p2(123)
>>> res2
['a', 'a', 'a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'b', 'b', 123, 123, 123, 123, 123, 123]
>>> # nested.subgraph_tree.dot_display()
>>> # nested.dot_display(graph_attributes=('label="%s"' % (nested.label,), 'labelloc=top;', 'rankdir=LR;'), node_label_callback=lambda label, x, y: nested.processor_nodes[label].label)
class onyx.dataflow.graph.SisoProcessorGraphNode(builder, label=None)

Bases: onyx.dataflow.graph.SisoProcessorNode, onyx.dataflow.graph.Composite, onyx.util.dotdisplay.DotDisplay

Composite siso processor.

>>> builder = ProcessorGraphBuilder()
>>> builder.connect(builder.new_node(SisoMethodProcessorNode('node1')), builder.new_node(SisoMethodProcessorNode('node2')))
>>> spgn1 = SisoProcessorGraphNode(builder, 'graphnode1')
>>> spgn1.is_primitive
False
>>> p1 = spgn1.get_only_inport()
>>> res = list()
>>> spgn1.set_only_outtarget(res.append)
>>> p1('a');p1(1);p1(None);
>>> res
['a', 1, None]
check_managed(status)
dot_display(dot_iter=None, temp_file_prefix=None, display_command_format=None, **kwargs)

Display a dot-generated representation of the graph.

Optional dot_iter is the generator function to pull on for text lines of DOT code. It is called with kwargs. It defaults to the object’s dot_iter method

Optional temp_file_prefix is the prefix used for the temporary filename. It defaults to the name of the type of self.

Optional display_command_format is a formatting string, with %s where the temporary filename goes, that is used to generate the command that will display the file. It defaults to the the value of the module’s DISPLAY_COMMAND_FORMAT attribute.

Remaining keyword arguments, kwargs, are handed to the dot_iter generator function. In general, no arguments are necessary to get an object to display.

Returns a tuple of four items: (temp_filename, stdout, stderr, cmd), where temp_filename is the name of the temporary file that gets created for the display command to use, stdout and stderr are the standard-out and standard-error from the command, and cmd is a string representing the command. The caller is responsible for removing temp_filename.

Raises AttributeError if dot_iter is not given and the object doesn’t have a dot_iter method

Raises SubprocessError if the command fails to execute or if the command exits with a non-zero return code.

dot_iter(graph_label=None, graph_attributes=(), node_attributes=('height=.3', 'width=.4', 'fontsize=8'), depth_limit=None, force_display_labels=(), node_label_callback=<function <lambda> at 0x20231500>, arc_label_callback=<function <lambda> at 0x20231500>, subgraph_label_callback=<function <lambda> at 0x20231500>, node_attributes_callback=None, arc_attributes_callback=None, subgraph_attributes_callback=None)
get_inport(inport_label)
get_only_inport()
get_outtarget(outport_label)
is_composite
is_managed
is_primitive
iter_targets
label
manager
num_inports
num_outtargets
only_outtarget
set_inport(inport_label, intarget)
set_managed(manager)
set_only_inport(only_intarget)
set_only_outtarget(target)
set_outtarget(outport_label, target)
typed_label
verify()
class onyx.dataflow.graph.SisoProcessorNode(label=None)

Bases: onyx.dataflow.graph.ProcessorNode, onyx.dataflow.graph.SingleIn, onyx.dataflow.graph.SingleOut

Baseclass for processor nodes having single-input single-output semantics.

Optional label can be provided to give a human-readable label for the node; default ‘<unlabeled>’.

This baseclass provides the necessary interfaces for a processor node to be part of a managed set of processors in a processor graph.

Subclasses must override <blah> and <blah> in order to implement their specific logic. The baseclass is just an epsilon node that passes its inputs to its outputs in an unbuffered, one-to-one fashion.

>>> sp = SisoProcessorNode('foo')
>>> sp.is_primitive
True
>>> sp.label
'foo'
>>> sp.typed_label
"SisoProcessorNode('foo')"
>>> sp.set_only_outtarget(-2)
Traceback (most recent call last):
  ...
TypeError: SisoProcessorNode labeled 'foo' expected a callable target, got a int
check_managed(status)
get_inport(inport_label)
get_only_inport()
get_outtarget(outport_label)
is_composite
is_managed
is_primitive
iter_targets
label
manager
num_inports
num_outtargets
only_outtarget
set_inport(inport_label, intarget)
set_managed(manager)
set_only_inport(only_intarget)
set_only_outtarget(target)
set_outtarget(outport_label, target)
typed_label
class onyx.dataflow.graph.Splitter(label=None)

Bases: onyx.dataflow.graph.ProcessorNode, onyx.dataflow.graph.SingleIn, onyx.dataflow.graph.MultiOut

Undifferentiated splitter. Each input item is sent to each of the targets in an unspecified order.

Each input item must be immutable.

check_managed(status)
get_inport(inport_label)
get_only_inport()
get_outtarget(outport_label)
is_composite
is_managed
is_primitive
iter_targets
label
manager
num_inports
num_outtargets
set_inport(inport_label, intarget)
set_managed(manager)
set_only_inport(only_intarget)
set_outtarget(outport_label, target)
typed_label
class onyx.dataflow.graph.object_base(**kwargs)

Bases: object

Uggg! A baseclass to use when cooperatively-super-using multiple-inheritance mix-ins are expecting a **kwargs to __init__, so we need to insert a baseclass with an __init__ that makes sure the call to object.__init__ has no arguments.