The classes here support the common needs of clients for getting data into dataflow graphs. The key feature of each such source is that it internally manages a thread that pushes data through the graph. These sources differ in terms of how they handle obtaining the data that gets pushed into the graph. These classes provide non-blocking supervisor functions, e.g. start(), stop(), pause(), which provide the basic dataflow controls associated with streaming data.
Two types of sources are available:
The IteratorSource is an “impedance-matching” processor that is inserted between a pull-based source of data and the push-based dataflow graph. It has a thread that pulls on a client-supplied iterator that generates data, e.g. a file object. It has three modes of operation selected via its throttle interface. In the as-needed mode, it pulls on the iterator on an as-needed basis whenever the queue of unsent data is below some low-water mark. In the simulate-real-time mode it pulls on the iterator at a client-supplied interval of wall-clock time. The third mode is a limit of the simulate-real-time mode when the interval goes to zero and the source pulls in the data as quickly as it can.
The NonBlockingSource is simply a buffering processor for the push model of dataflow. It has a sendee, and it has a process() function. However, unlike simple processors, its process() function is non-blocking; this function simply queues up the data and returns quickly. The NonBlockingSource handles the buffering and it has a separate thread so as to call the sendee asynchronously whenever there is data available that has been put in the queue via the process() interface. The idea here is that the client is handling some push-based data source, e.g. a live input, that must be serviced rapidly, so the client cannot wait for process() to do a lot of work. The NonBlockingSource provides the buffering that simplifies the interface between such a time-sensitive source and the dataflow graph.
The IteratorSource and NonBlockingSource classes are subclasses of DataflowSourceBase. This base class handles most of the semantics needed for the asynchronous sending of data into the network. It can be subclassed by clients with very specialized needs that are not met by either NonBlockingSource or IteratorSource.
>>> len((DataflowSourceBase(), NonBlockingSource(), IteratorSource(())))
3
Bases: onyx.dataflow.streamprocess.ProcessorBase
Baseclass for objects which serve as sources of dataflow packets.
This class has a thread that pushes data into the sendee when there’s data in the queue. It has supervisor functions, e.g. start(), stop(), pause(), for managing the collecting of data and the emptying of the queue.
This baseclass object takes care of emptying the queue asynchronously via calls to send().
Subclasses need to implement a way to append items to the queue (eventually calling _enqueue() while the critical section is locked) when in the Running or Paused states.
This property is the number of unprocessed samples in the queue.
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
This call permanently shuts down the processor and finishes the worker threads. It must be called in order for the processor and its thread resources to get freed up. It can only successfully be called once.
The queue of pending samples is cleared. This function blocks until the worker thread has shutdown after it finishes processing (sending) whatever sample it was working on.
If you need to process the existing queue before shutting down, call stop(True), then wait() (which blocks until the queue has been processed), and, then call done().
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Returns False if the instance is usable. Otherwise returns a string, done or broken, indicating that the instance is unusable due either to method done() having been called or one of the worker threads having encountered an exception that renders the instance broken.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Change state to stop sending samples. Samples will still be accepted and queued up.
Subclasses must override the process(value) function.
The function accepts a single value. If, after processing the single value, it has a new result or results, the function must call send(result) for each result that is calculated. For a given call to process(), the function can call send() zero, one, or multiple times depending on the semantics that the processor type implements.
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Start accepting samples into the queue and start sending queued samples to the sendee.
Raises ValueError if no sendee has been set.
>>> type('FooSource', (DataflowSourceBase,), {})().start()
Traceback (most recent call last):
...
ValueError: FooSource expected to have a sendee before being started; perhaps it is not yet connected to a Processor
Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().
Change state to stop accepting or sending samples. If flush is True, the existing queue of samples will be sent to the sendee, otherwise the queue will be cleared (the default).
If stopped or paused, wait until any pending processing of queued samples is done or the state changes to running, then return. This is a blocking method for the caller. Don’t call this unless you really need to wait for the queue to empty, e.g. for (more) reproducible testing of the simulated real-time processing.
Bases: onyx.dataflow.source.DataflowSourceBase
A source that pulls on a user-supplied iterable for its samples. The default is to pull on the iterator as-needed based on the sendee’s consumption of samples. E.g. this can be used to pull on a file-based iterator. Running in simulated real-time is also possible, in which case the iterator is pulled on at the simulated real-time interval, regardless of whether the sendee can keep up.
Set up some stuff, including fractions of a second.
>>> base_interval = 1 / 2
>>> base_by_4_plus_epsilon = base_interval * 4 + base_interval / 8
Set up processor that dumps into a list and start it. Wait until the iterator stops. It empties the xrange source iterator with on-demand pulling.
>>> result = list()
>>> nbb = IteratorSource(xrange(10), sendee=result.append)
>>> nbb.is_iter_stopped
False
>>> nbb.start()
>>> nbb.wait_iter_stopped()
True
>>> nbb.is_iter_stopped
True
>>> nbb.stop(flush=True)
>>> nbb.wait()
>>> result
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> del result[:]
A call to wait_until_finished() blocks until the iterator has stopped and work on the iterated items has all finished.
>>> _ = nbb.set_iter(xrange(10))
>>> nbb.start()
>>> nbb.wait_until_finished()
>>> nbb.is_iter_stopped
True
>>> result
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> del result[:]
Show simulated real-time with an interval of base_interval.
>>> nbb.simulate_realtime(base_interval)
>>> _ = nbb.set_iter()
>>> nbb.is_iter_stopped
False
>>> nbb.start()
Sleep so that the empty iteration from the prior set_iter() call gets exhausted; then check that this is so.
>>> time.sleep(base_interval)
>>> nbb.is_iter_stopped
True
In the following we use a function to run the steps in the dance here in order to avoid test-disrupting delays that would occur if the doctest machinery had to process each line separately.
Run with a non-empty iterator. Wait a total of just over four of the base_interval intervals, giving five interval-bounding items.
XXX the code in go(), go2() and go3()is not deterministic and has race conditions w/the real-time simulation; will undoubtedly have a test failure at some point; we need more deterministic testing of simulate_realtime
>>> def go():
... results = list()
... assert not result
... nbb.set_iter(count())
... results.append(nbb.wait_iter_stopped(base_interval))
... time.sleep(base_by_4_plus_epsilon - base_interval)
... results.append(nbb.is_iter_stopped)
... nbb.stop(True)
... nbb.wait()
... results.append(nbb.is_iter_stopped)
... return tuple(results)
>>> go()
(False, False, False)
>>> result
[0, 1, 2, 3, 4]
>>> del result[:]
Show switching iterators during simulated real-time processing. Each call to set_iter() resets the real-time clock. Note that we capture and return the values of the previous count() and repeat() iterators.
>>> def go2():
... results = list()
... results.append(nbb.set_iter())
... assert not result
... nbb.start()
... results.append(nbb.set_iter(repeat(1)))
... time.sleep(base_by_4_plus_epsilon)
... results.append(nbb.set_iter(repeat(2)))
... time.sleep(base_by_4_plus_epsilon)
... nbb.stop(True)
... nbb.wait()
... return tuple(results)
>>> go2()
(count(5), <tupleiterator object at 0x...>, repeat(1))
Since set_iter resets the real-time clock, each iterator gives five interval-bounding items.
>>> result
[1, 1, 1, 1, 1, 2, 2, 2, 2, 2]
>>> del result[:]
Show that pausing does not interfere with pulling on the iterator.
>>> def go3():
... nbb.set_iter()
... assert not result
... nbb.start()
... nbb.set_iter(count())
... time.sleep(base_by_4_plus_epsilon)
... nbb.pause()
... nbb.wait()
... # result1 gets the samples bounding 4 intervals
... result1 = list(result)
... time.sleep(base_by_4_plus_epsilon)
... # no change to result since we're wait()-ing (but, the internal queue has changed)
... result2 = list(result)
... # call to start() is not strictly necessary since stop(True)
... # will pull everything even if we're paused when it's called
... nbb.start()
... nbb.stop(True)
... nbb.wait()
... # has all 8 sample-bounding intervals
... result3 = list(result)
... return result1, result2, result3
>>> result1, result2, result3 = go3()
>>> result1
[0, 1, 2, 3, 4]
>>> result2
[0, 1, 2, 3, 4]
>>> result3
[0, 1, 2, 3, 4, 5, 6, 7, 8]
>>> nbb.done()
Here’s a test that verifies just about the only thing you can deterministically verify with a simulate-real-time source, and that is that it hasn’t produced more items than the elapsed time would allow.
>>> def go():
... result = list()
... rts = IteratorSource(count(), sendee=result.append)
... rts.simulate_realtime(1/128)
... rts.start()
... interval = rts.real_time_interval
... start_time = rts.start_time
... while True:
... len_result = len(result)
... t = time.time()
... # the first event happens right at start, so subtract 1 from the length to get the number of intervals
... assert (len_result-1) * interval <= t - start_time, str((len_result, interval, t, start_time))
... if len_result >= 500: break
... time.sleep(interval/4)
... rts.stop()
... rts.done()
>>> go()
This test runs the IteratorSource with simulate_realtime(0), so it aggressively pulls on the iterable. We use it two ways: with a generator that sleeps a lot, and with a generator that does a lot of “work”.
>>> result4 = list()
>>> rts2 = IteratorSource(sendee=result4.append)
>>> rts2.simulate_realtime(0)
>>> rts2.start()
>>> def gated(interval):
... while True:
... yield None
... time.sleep(interval)
>>> def fake_work():
... ranger = xrange(1000000)
... while True:
... yield sum(ranger)
XXX arrrrghh, causes deadlock!
>>> interval = 0.02
>>> rts2.set_iter(gated(interval)) and None
>>> while len(result4) < 50: time.sleep(interval * 4)
>>> rts2.set_iter(fake_work()) and None
>>> count = len(result4)
>>> count >= 50
True
>>> while len(result4) < count + 20: time.sleep(interval * 4)
>>> rts2.stop()
>>> rts2.done()
>>> len(result4) >= count + 20
True
>>> result4[0] is None
True
>>> result4[-1]
499999500000
This property is the number of unprocessed samples in the queue.
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
This call permanently shuts down the processor and finishes the worker threads. It must be called in order for the processor and its thread resources to get freed up. It can only successfully be called once.
The queue of pending samples is cleared. This function blocks until the worker thread has shutdown after it finishes processing (sending) whatever sample it was working on.
If you need to process the existing queue before shutting down, call stop(True), then wait() (which blocks until the queue has been processed), and, then call done().
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Returns False if the instance is usable. Otherwise returns a string, done or broken, indicating that the instance is unusable due either to method done() having been called or one of the worker threads having encountered an exception that renders the instance broken.
This property is True if the current iterator from which this source is pulling has raised StopIteration, otherwise the property is False. See also set_iter().
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Change state to stop sending samples. Samples will still be accepted and queued up.
Subclasses must override the process(value) function.
The function accepts a single value. If, after processing the single value, it has a new result or results, the function must call send(result) for each result that is calculated. For a given call to process(), the function can call send() zero, one, or multiple times depending on the semantics that the processor type implements.
Current value of the real-time simulation interval between pulls of events from the iterator, or None if using need-based pulling.
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Using iterable, set up the iterator from which this source will pull. Calling this function with no argument is equivalent to calling it with the empty tuple and will suspend the processing that fills the queue. Calling this function resets the real-time clock.
Sets the is_iter_stopped property to False. When the iterator is exhausted and raises StopIteration, the is_iter_stopped property will become True.
This function cannot interupt an ongoing call to the current iterator’s next() method, so it can block indefinitely (and it will deadlock if it’s called from said next() method).
Returns the iterator that was set up during the previous call to set_iter().
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Set the non-negative simulated real-time interval, in seconds, for pulling samples from the iterator. An interval of None will cause need-based pulling of items.
An interval of zero can be specified in order to drain the iterator very responsively. This is useful for pulling on an iterator that has its own rate limiting, e.g. a stream-iterator from a live audio source. However, using a zero or very small interval on an unbounded, non-rate-limited iterator can exhaust memory in our queue if calls pushing events into the sendee take longer, on average, than the real-time interval.
Calling this function resets the real-time clock.
Start accepting samples into the queue and start sending queued samples to the sendee.
Raises ValueError if no sendee has been set.
>>> type('FooSource', (DataflowSourceBase,), {})().start()
Traceback (most recent call last):
...
ValueError: FooSource expected to have a sendee before being started; perhaps it is not yet connected to a Processor
The value returned by the time module’s time.time() function when the current iterator was started.
Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().
Change state to stop accepting or sending samples. If flush is True, the existing queue of samples will be sent to the sendee, otherwise the queue will be cleared (the default).
If stopped or paused, wait until any pending processing of queued samples is done or the state changes to running, then return. This is a blocking method for the caller. Don’t call this unless you really need to wait for the queue to empty, e.g. for (more) reproducible testing of the simulated real-time processing.
If timeout is None (default), block unconditionally until the is_iter_stopped property is :const`True` and return True. Otherwise, timeout is a non-negative number and the call will block for at most timeout seconds, returning True if is_iter_stopped becomes True or returning False otherwise.
Block unconditionally until the iterator is emptied (so the is_iter_stopped property is :const`True`) and until work on all events has been finished.
Bases: onyx.dataflow.source.DataflowSourceBase
A stream processor object with a non-blocking process() function, and non-blocking control functions. This implements a thread barrier. The process() function will not block. This means that calls made to send() by this processor are asynchronous. It is intended that this processor be used at the periphery of a graph in order to buffer data from an asynchronous source that requires responsive handling of its calls to process(), e.g. some form of live input. Also, except for wait() and done(), the supervisor methods do not block.
Set up processor that dumps into a list.
>>> result = list()
>>> nbb = NonBlockingSource(sendee=result.append)
Start the processor, fiddle its state and process stuff. In a real-world application, the process calls would come from a different thread and we would not block that thread.
>>> nbb.start()
>>> for i in xrange(10): nbb.pause(); nbb.start(); nbb.process(i); nbb.start(); nbb.pause(); nbb.process(i); nbb.start(); nbb.pause(); nbb.start()
>>> for i in xrange(10, 20): nbb.process(i); nbb.process(i)
Stop it, letting the queue get processed.
>>> nbb.stop(True)
Polling wait for the queue to empty.
>>> while nbb.backlog > 0: time.sleep(1/32)
>>> size = len(result)
>>> size == 40
True
Because the processor is stopped, these items do not go into the queue; they are dropped.
>>> for i in xrange(20, 30): nbb.process(i)
>>> assert nbb.backlog == 0
>>> assert len(result) == 40
When paused, things get into the queue, but aren’t processed.
>>> nbb.pause()
>>> for i in xrange(30, 40): nbb.process(i); nbb.process(i)
>>> assert nbb.backlog == 20
Now, stop accepting new samples, but allow the existing queue to keep being processed.
>>> nbb.stop(True)
We’re stopped, so these new guys get dropped. But, we can’t yet make claims about the state of the queue.
>>> for i in xrange(50, 60): nbb.process(i); nbb.process(i)
Now, wait() blocks until the queue is empty
>>> nbb.wait()
>>> nbb.backlog
0
>>> assert len(result) == 60
Back in pause mode, dump more stuff in there
>>> nbb.pause()
>>> for i in xrange(70, 80): nbb.process(i); nbb.process(i)
>>> assert nbb.backlog == 20
Stop and allow the queue to empty; wait again for the queue (that got filled while we were paused) to get empty.
>>> nbb.stop(True)
>>> nbb.wait()
>>> result
[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10, 11, 11, 12, 12, 13, 13, 14, 14, 15, 15, 16, 16, 17, 17, 18, 18, 19, 19, 30, 30, 31, 31, 32, 32, 33, 33, 34, 34, 35, 35, 36, 36, 37, 37, 38, 38, 39, 39, 70, 70, 71, 71, 72, 72, 73, 73, 74, 74, 75, 75, 76, 76, 77, 77, 78, 78, 79, 79]
When stop() is used, the queue is cleared without any pending items being processed.
>>> nbb.start()
>>> for i in xrange(90, 100): nbb.process(i); nbb.process(i)
>>> nbb.stop()
>>> assert nbb.backlog == 0
Given the asynchronous calls to send, we can’t make any strong assertions about how many of the (90, 100) guys got sent and put into result.
>>> assert len(result) >= 60
The blocking call to done() shuts down the processor and its associated sending thread. After this call returns, the processor will never send an item and cannot be used.
>>> nbb.is_dead
False
>>> nbb.done()
>>> nbb.is_dead
'done'
However, a done processor silently bit-buckets items sent to process().
>>> for i in xrange(100, 110): nbb.process(i); nbb.process(i)
Once done() is called, it’s an error to try to use the processor.
>>> nbb.start()
Traceback (most recent call last):
...
ValueError: failed call to start() on an is_dead()==done NonBlockingSource
>>> nbb.stop()
Traceback (most recent call last):
...
ValueError: failed call to stop() on an is_dead()==done NonBlockingSource
>>> nbb.pause()
Traceback (most recent call last):
...
ValueError: failed call to pause() on an is_dead()==done NonBlockingSource
>>> nbb.wait()
Traceback (most recent call last):
...
ValueError: failed call to wait() on an is_dead()==done NonBlockingSource
It’s reasonable to allow done() to be called again, e.g. from __del__
>>> nbb.done()
And, as we said, a done processor silently bit-buckets items.
>>> for i in xrange(110, 120): nbb.process(i); nbb.process(i)
This property is the number of unprocessed samples in the queue.
A debug context for this processor. This attribute is an object returned by dcheck() in the onyx.util.debugprint module, and may be used for debug output. The tag for turning on such output is available as debug_tag
Activate this tag to get debugging information, see onyx.util.debugprint.DebugPrint
This call permanently shuts down the processor and finishes the worker threads. It must be called in order for the processor and its thread resources to get freed up. It can only successfully be called once.
The queue of pending samples is cleared. This function blocks until the worker thread has shutdown after it finishes processing (sending) whatever sample it was working on.
If you need to process the existing queue before shutting down, call stop(True), then wait() (which blocks until the queue has been processed), and, then call done().
Return a graph for this processor. By default this is just a single node whose label is the label of processor; derived classes may wish to override this property.
Returns False if the instance is usable. Otherwise returns a string, done or broken, indicating that the instance is unusable due either to method done() having been called or one of the worker threads having encountered an exception that renders the instance broken.
Return a label for this processor. By default this is just the name of the class; derived classes may wish to override this property by providing a different label to __init__().
Change state to stop sending samples. Samples will still be accepted and queued up.
Internal function that pushes result into the sendee. Implementations of process() must call this to push results. To set up the sendee, (the target of the push), clients of the processor must either initialize the object with a sendee, or call set_sendee(). Processors created with a sendee of False will never send, but will not error if send is called.
The callable this processor will use to send events; see set_sendee()
Whether this processor will currently send events at all; see set_sending()
Clients call this to set up the callable where the processor will send its results.
Clients call this to turn sending from a processor on or off.
Start accepting samples into the queue and start sending queued samples to the sendee.
Raises ValueError if no sendee has been set.
>>> type('FooSource', (DataflowSourceBase,), {})().start()
Traceback (most recent call last):
...
ValueError: FooSource expected to have a sendee before being started; perhaps it is not yet connected to a Processor
Subclasses may use this decorater on their process function to implement the usual bypass and process semantics and to set up the debug context returned by dc().
Change state to stop accepting or sending samples. If flush is True, the existing queue of samples will be sent to the sendee, otherwise the queue will be cleared (the default).
If stopped or paused, wait until any pending processing of queued samples is done or the state changes to running, then return. This is a blocking method for the caller. Don’t call this unless you really need to wait for the queue to empty, e.g. for (more) reproducible testing of the simulated real-time processing.