-
Notifications
You must be signed in to change notification settings - Fork 8
Streams and Agents
A stream is a sequence of arbitrary length. A stream is modified only by appending values to the end of the sequence. The elements in a stream cannot be modified. At any point in a computation the value of a stream is either an ordered list or a NumPy array containing the elements that have been added to the stream in the order in which they were added. Programs operate on fixed size lists or arrays even though the length of a stream can grow. Later, we will see how stream memory is managed.
An example of a stream is a sequence of measurements made by a sensor. As time progresses the sensor may append new measurements to the end of the sequence. Measurements that are already in the stream cannot be modified.
You can specify a stream to be timestamped or not timestamped. We either use NumPy structured arrays to hold timestamps or use the convention that the zeroth column in a NumPy array contains timestamps. Each row of the array represents a single element of the stream, and the zeroth element of the row is the timestamp for that element. If a stream is represented by a list then you can use the same convention (zeroth member of each stream member is a timestamp) or use a named tuple called TimeAndValue, with timestamp and value fields, for each data item.
Streams are created by sources, such as sensors, and streams are read and modified by agents. An arbitrary number of agents can read a stream. Each stream is modified by exactly one agent.
An agent is a state-transition that can: (1) Append values to the tail of stream. (2) Read a stream. (3) Subscribe to be notified when a stream is modified. (See Agent.py for details of agents.)
An agent is either asleep (inactive) or awake (active). An agent sleeps until it is woken up by receiving a notification from a stream to which the agent has subscribed. The notification tells the agent that the stream has been modified since the agent last accessed the stream. When an agent wakes up it executes a method called next(). The agent may read and write streams while it executes this method. When execution of the method terminates, the agent goes back to sleep. The method is called "next" because waking up the agent causes it to execute its next step.
An agent can read, write and subscribe to the same stream. An agent may subscribe to a stream without reading the stream's values; for example the agent may subscribe to a clock stream so that the agent is woken up whenever the clock stream has a new value. Likewise, an agent may read a stream without subscribing to it: the agent reads such streams when it is woken up by a stream to which it has subscribed.
The most recent values of a stream are stored either in a fixed-size list or a NumPy array called "recent." Associated with a stream is a parameter called num_in_memory. The most recent num_in_memory elements of a stream are stored in main memory and earlier elements are either discarded or stored in files. Agents operate on streams as though the streams are of unbounded length. Later we describe the memory management that allows agents to treat fixed-size data structures as unbounded.