Tutorial of Writing StreamQL Programs

Welcome! StreamQL is a query language for efficient processing IoT data streams. StreamQL provides a rich set of streaming constructs, and it is implemented as a light-weight Java library. In this tutorial, you will learn how to write StreamQL queries in Java to process the data streams.

The tutorial will first provide a very simple example of a StreamQL program. Then, the tutorial will introduce some basic interfaces in the StreamQL's Java library. After that, this tutorial will introduce several core streaming constructs (i.e., operators) of StreamQL and use them to program a healthcare monitoring application.

Let's get started!

A Simple Example

The code below gives a simple example of a StreamQL program in Java.

Given a signal measurement of type VT that contains a double value in the field of val, the query sum of type Q computes the sum of the values of the measurements. The method eval returns an object that encapsulates the evaluation algorithm for the query. The methods init and next are used to initialize the memory and consume data items. When the input stream terminates, the end method is invoked.

Sink and Algo

StreamQL defines two interfaces, Sink and Algo, to describe the streaming computation in a push-based manner.

The Sink interface is used for specifying a sink that consumes a stream. A sink consumes a stream with two methods, next and end, that are used for stream elements and the end-of-stream marker respectively.

The code below shows the definition of the Sink interface in Java.

Below is an instance of Sink that prints each incoming data item and the end-of-stream marker to the console.

The Algo interface is used for describing the evaluation algorithm of a query. An implementation of Algo specifies how the input stream is transformed into the output stream.

The program below shows the definition of the Algo interface, which extends the Sink interface because it consumes a stream. The connect method connects the algorithm to a sink, and the init method initializes/resets the state of the algorithm.

Below is an instance of Algo that echoes each incoming data item and the end-of-stream marker to the output.

Streaming Constructs in StreamQL

In this section, core streaming constructs of StreamQL are introduced with illustrative examples.

Data Streams used in Examples

The data stream used in examples is a stream of real-valued discrete-time signal measurements. Each measurement is of the form {val : double, ts : int}, where val is the signal strength, and ts is the discrete time-stamp carried by the measurement that increases continuously (0, 1, 2, ...).

The Java program below describes the type of the signal measurement.

To present the signal stream as an input, a signal generator is introduced to this example signal:

Therefore, given an algorithm algo, the program below illustrates how to feed the input stream of signal measurements to algo.

StreamQL Construct: filter

The filter construct echoes the input items that satisfy a predicate to the output.

Given a VT stream, the query halve = filter(x -> x.ts % 2 == 0) collects the signal elements whose timestamp have an even value and thereby halving the sampling rate of the signal, where x -> x.ts % 2 == 0 is a lambda expression that serves as the filtering predicate.

StreamQL Construct: map

The map construct applies a mapping function to every input item and sends the results to the output. For example, square = map(x -> x.val * x.val) computes the squared signal strength for each input items and echoes the results to the output.

StreamQL Construct: aggr and reduce

In StreamQL, both aggr and reduce constructs describe the streaming aggregation over the input stream. The aggr construct computes the running aggregation of the input (i.e., it outputs the aggregate every time the aggregate is updated) while the reduce construct only emits the total aggregate when the input stream terminates.

For example, queries f = aggr(0.0, (sum, x) -> sum + x.val) and g = reduce(0.0, (sum, x) -> sum + x.val) respectively compute the running sum and the total sum of the signal strength, where 0.0 is the initial value of the aggregate, and (sum, x) -> sum + x.val is a binary function that updates the aggregate. The table below illustrates the execution of f and g, where time progresses in the left-to-right direction, and # represents the end-of-stream marker.

input{0.0, 0}{0.2, 1}{1.2, 2}{0.3, 3}{0.7, 4}#
f output0.00.21.41.72.4#
g output     2.4 #

StreamQL provides variants of the aggr and reduce constructs that allow users to aggregate the stream using the value of the first input item as the initial value of the aggregate. For example, the query below identifies the last received signal measurement.

More variants of the aggr and reduce constructs can be found in the StreamQL's API documentation.

StreamQL Construct: pipeline

StreamQL provides the pipeline construct to specify the streaming/serial composition. For example, for queries f of type Q<A,B> and g of type Q<B,C>, pipeline(f, g) sends the output of f to the input of g, and its type is Q<A,C>.

The program below uses the pipeline construct to compute the sum of the squared signal strength.

This construct generalizes naturally to more than two arguments, and it is useful for setting up a complex computation as a pipeline of multiple stages. To illustrate, StreamQL provides constructs such as pipeline(f,g,h), pipeline(f,g,h,i), and etc.

StreamQL Construct: tWindow and sWindow

The so-called windowing constructs are used to partition an unbounded stream into finite fragments called windows and perform computations on each one of them independently.

The tWindow (i.e., tumbling window) construct splits the stream into contiguous non-overlapping regions. For a query f of type Q<A,B> and a natural number n that is larger than 0, the query tWindow(n,f) applies f to tumbling windows of size n.

The sWindow (i.e., sliding window) construct splits the stream into overlapping regions. For a query f of type Q<A,B> and natural numbers n, s with 0 < s < n, the query sWindow(n,s,f) applies f to windows of size n with a new window starting every s items.

Suppose f = reduce((sum, x) -> sum + x) which calculates the total sum of an integer stream. The following table illustrates the execution of tWindow and sWindow.

input:12345678#
tWindow(2,f) output: 3 7 11 15#
tWindow(3,f) output:  6  15  #
sWindow(2,1,f) output: 3579111315#
sWindow(3,1,f) output:  6912151821#
sWindow(3,2,f) output:  6 12 18 #

Given a stream of signal measurements, suppose the sampling frequency of the signal is 100 Hz, the program below computes the sum of signal strength for every 10 seconds (i.e., 1000 samples).

Moreover, the program below computes the sum of the signal strength for past 10 seconds (i.e., 1000 samples) every 5 seconds (i.e., 500 samples) using the sWindow construct.

Variants of the window constructs: tWindowN and sWindowN

StreamQL provides variants of the window constructs that allow the programmer to specify a function to reduce the items contained in a window of size n.

For example, in the program below, sWnd3 computes the sliding (moving) sum over windows of size 3, and tWnd2 computes the average over tumbling windows of size 2.

Variants of the window constructs: moving window aggregation

StreamQL provides constructs to describe the moving-window aggregation (i.e., aggregation over sliding windows). Given an initial aggregate init of type B and a functionupdate of type that updates the aggregate, the query sWndReduce(wndSize, slidingInterval, init, update) uses the moving-window aggregation construct sWndReduce to apply the reducing aggregation over sliding windows with length wndSize and sliding interval slidingInterval.

Moreover, StreamQL defines an additional construct, sWndReduceI, for users to implement the efficient built-in sliding window algorithms when the aggregation is invertible. The query sWndReduceI(wndSize, slidingInterval, init, add, remove) updates the moving-window aggregate (add the new item, remove the item falling off the window).

For example, suppose add is (sum, newItem) -> sum + newItem, the remove that inverts the aggregation should be(sum, oldItem) -> sum − oldItem.

The program below presents the query that computes the average strength of signal in a sliding window, where Pair is a class that contains two fields: sum of type Double and cnt of type Integer.

StreamQL Construct: parallel

StreamQL provides the parallel construct for executing multiple queries in parallel on the same input stream and combining their results.

For queries f and g of type Q<A,B>, the query parallel(f,g) of type Q<A,B> describes the following computation: The input stream is duplicated with one copy sent to f and one copy sent to g. The queries f and g compute in parallel, and their outputs are merged (specifically, interleaved) to produce the final output stream.

Using the parallel construct, the query that monitors the running average value of the signal can be written as:

StreamQL Construct: groupBy

Suppose the measurements are detected by multiple sensors, where each measurement records the identifier, id, of the sensor. Such a measurement can be represented by the Java class as below:

Suppose that we have written a query f of type Q<IVT,B> that computes an aggregate of items with a fixed identifier, i.e. under the assumption that all the items of the input stream have the same identifier. Then, to compute this aggregate across all identifiers, the most natural way to to partition the input stream by a key, the identifier field id in this case, and supply the corresponding projected sub-stream to a copy of f.

This construct is called groupBy, and it can be used in the following format:

The first argument x -> x.id is a function that specifies the partitioning key, f describes the computation that will be independently performed on each sub-stream corresponding to a key, and (key,b)-> ... is a function of type that returns the final results by combining key and the output of f from each sub-stream.

The program below counts the total number of samples measured by each sensor, where KV is a key-value pair to store the final results.

StreamQL Construct: take and takeUntil

Given a predicate p on elements of type A, the query takeUntil(p) of type Q<A,A> computes like the identity transformation while there is no occurrence of an item satisfying p in the input. When it encounters the first item satisfying p, it emits it to the output and halts.

A similar query is take(n) of type Q<A,A>, where n is a positive integer, which echoes the first n items of the input stream to the output and then halts.

input:1234567#
take(5) output:12345 #   
takeUntil(x -> x >= 4) output:1234 #    

For example, given the sampling frequency as 100 Hz, the program below selects the samples that arrive in the first 1 minute.

Variants of take and takeUntil

StreamQL provides variants for the take and takeUntil constructs.

Given a query f of type Q<A,B> and a predicate p over B, takeUntil(f, p) of type Q<A,B> applies f to its input stream and applies takeUntil(p) to the output of f, i.e., it is the syntax sugar of pipeline(f, takeUntil(p)).

Similarly, given a positive integer n, take(f, n) applies take(n) to the output of f, which is the syntax sugar of pipeline(f, take(n)).

Using these two syntax sugars leads to the improvement of performance because (1) the overhead of the streaming composition is reduced, and (2) once enough elements have been taken, take(n,f) and takeUntil(p,f) can immediately terminate query f.

StreamQL Construct: skip and skipUntil

Given a predicate p on streaming elements of type A, the query skipUntil(p) of type Q<A,A> emits no output while the input contains no item satisfying p. When the first item satisfying p is seen, it emits it to the output and continues to compute like the identity transformation.

The query skip(n) of type Q<A,A>, for a positive integer n, emits no output for the first n input items, and then proceeds to echo the rest of the input stream.

input:1234567#
skip(3) output:   4567#
skipUntil(x -> x >= 4) output:   4567#

If the sampling frequency is 100 Hz, the program below skips the signal measurements received in the first 1 minute.

StreamQL Construct: ignore

The ignore construct ignores the input items and halts when receiving an end-of-stream marker.

input:12345#
ignore() output:     #

StreamQL provides the syntax sugar for pipeline(take(n),ignore()) as ignore(n) and pipeline(takeUntil(p),ignore()) as ignoreUntil(p).

The program below ignores the first 100 signal measurements.

StreamQL Construct: search

Given a predicate p on streaming elements of type A, the query search(p) of type Q<A,A> emits no output while it searches for the first occurrence of an item satisfying p. When it encounters such an item, it emits the item to the output and halts.

input:12345#
search(x -> x >= 4) output:   4 #  

StreamQL Construct: seq

The seq construct (i.e., temporal sequencing combinator) can be used to apply different queries in sequence (i.e., one after the other), thus varying the computation over time. More specifically, for queries f and g of type Q<A,B>, their sequencing seq(f,g) of type Q<A,B> computes like f until it halts, and then it proceeds to compute like g.

For example, if f = search(x -> x >= 3) and g = takeUntil(x -> x <= 2), then seq(f, g) computes as:

input:1234521#
f output:  3 #     
seq(f, g) output:  3452 #  

Suppose the signal is noisy in a period from the 1000th sample to the 2000th sample, the following program skips this part of signal samples and computes the sum of the signal strength for the remaining part of the stream:

StreamQL Construct: iterate

The iterate (i.e., temporal iteration) construct can be used to repeat a streaming computation indefinitely.

For a query f of type Q<A,B>, its iteration iterate(f) of type Q<A,B> executes f and every time it halts, it is restarted. This results in an unbounded temporal repetition of the computation that f specifies.

Now, the iteration of pipeline(f, g), where f = takeUntil(x -> x == 0) and g = reduce((s, y) -> s + y), computes as shown below

input:21304501703
f output:2130 #       
pipeline(f, g) output:   6 #       
iterate(pipeline(f, g)) output:   6  9  8 

Suppose the sampling rate of the signal is 100 Hz, the program below computes the maximum signal strength in the first 10 seconds for every 30 seconds.

Use StreamQL to Program ECG Peak Detection

StreamQL can be used to program streaming algorithms for real-world applications. This section will show how to use StreamQL to program a simple but effective algorithm for detecting peaks in the ECG (electrocardiogram) signal. This problem is one of the most widely studied detection problems in the area of biomedical engineering, as it forms the basis of many analyses over cardiac data.

ecg

In the above figure, (a) shows part of an ECG, which is the electrical cardiac signal recorded on the surface of the skin near the heart. The horizontal axis is time and the vertical axis is voltage. A simple but effective procedure for detecting the peaks consists of three stages:

  1. Smoothing the signal to eliminate high-frequency noise.
  2. Taking the derivative of the smoothed signal to calculate the slope.
  3. Finding the peaks using both the raw measurements and the derivatives.

Figure (b) shows a short snippet of an ECG signal, where the gray line corresponds to the input time series , the green line is the smoothed data

and the blue line is the derivative

A straightforward algorithm for detecting the peaks is to find the first occurrence (let us say at time ) where the derivative exceeds a pre-defined threshold hThred, followed by the first occurrence after (let us say at time ) where the derivative become less than a pre-defined threshold lThred.

Time point is located somewhere on the ascending slope towards the peak, and time point is located somewhere on the descending slope after the peak.

So, the exact peak location can be found by searching for the maximum value of the original time series in the interval from to . This pattern is illustrated in Figure (c).

Then, every time a peak is identified, this detection procedure is reset and repeated.

After giving a high-level description of a simple streaming algorithm for detecting the peaks in the ECG signal. We now use the StreamQL language to provide a complete description of this peak detection algorithm, which is a variant of Single-channel QRS detector (SQRS).

Suppose the type of the input data items is a record type VT = { val : V, ts : T }, where V is the time of scalar values, and T is the type of time points.

Suppose V is Double, and T is Integer, we have

At the top level, findPeak specifies the peak detection algorithm for the ECG data stream, which is defined asfindPeak = pipeline(smooth, deriv, detect). Here, smooth, deriv, and detect correspond to three aforementioned pipelining stages: (1) smoothing the signal, (2) computing derivatives, and (3) detecting peaks.

The smoothing query smooth of type Q<VT,VTF> has output type VTF, which is the record type VT extended with an additional component fval of type V for storing the smoothed (low-pass filtered) value. The specification of VTF is given by:

The following program presents the StreamQL implementation of smooth:

The idea is that for a sample x at time x.ts we consider the window (v, w, x, y, z) centered around x and calculate a weighted average over the window for the smoothed value.

The differentiation query derivQ of type Q<VTF,VTFD> calculates discrete derivatives by taking the difference of successive smoothed values. It is implemented as:

The record type VTFD extends VTF with an additional component dval for storing the derivative:

Then, in the detecting step, the detection of the first peak involves searching for the first time point when dval exceeds the threshold hThred. The signal interval from this point until the time point when dval falls below the threshold lThred contains the first peak. The signal in the interval is streamed to the argmax query (see below), which finds the data item with the highest value (in the raw, unfiltered signal). This process is repeated indefinitely in order to detect all peaks:

Finally, we present the complete StreamQL program for this case study:

Conclusion

This tutorial uses several example to show how to program a streaming computation using the Java library of StreamQL. However, it does not cover all streaming constructs in the StreamQL library. More details can be found in the API documentation of StreamQL.