Welcome! StreamQL is a query language for efficient processing IoT data streams. StreamQL provides a rich set of streaming constructs, and it is implemented as a light-weight Java library. In this tutorial, you will learn how to write StreamQL queries in Java to process the data streams.
The tutorial will first provide a very simple example of a StreamQL program. Then, the tutorial will introduce some basic interfaces in the StreamQL's Java library. After that, this tutorial will introduce several core streaming constructs (i.e., operators) of StreamQL and use them to program a healthcare monitoring application.
Let's get started!
The code below gives a simple example of a StreamQL program in Java.
x
// VT is the type of measurements, which contains a double value as valIterator<VT> stream = ... // input stream// sink of the output streamSink<Double> sink = ...// sum of the measurementsQ<VT, Double> sum = QL.aggr(0.0, (s, vt) -> s + vt.val);// evaluation of the queryAlgo<VT, Double> exe = sum.eval();// connect the output of query to sinkexe.connect(sink);// execution loopexe.init();while (stream.hasNext()) { VT vt = stream.next(); exe.next(vt);}exe.end();Given a signal measurement of type VT that contains a double value in the field of val, the query sum of type Q computes the sum of the values of the measurements. The method eval returns an object that encapsulates the evaluation algorithm for the query. The methods init and next are used to initialize the memory and consume data items. When the input stream terminates, the end method is invoked.
Sink and AlgoStreamQL defines two interfaces, Sink and Algo, to describe the streaming computation in a push-based manner.
The Sink interface is used for specifying a sink that consumes a stream. A sink consumes a stream with two methods, next and end, that are used for stream elements and the end-of-stream marker respectively.
The code below shows the definition of the Sink interface in Java.
xxxxxxxxxxpublic abstract class Sink<T> { // deal with incoming items public abstract void next(T item); // deal with the end-of-stream marker public abstract void end();}Below is an instance of Sink that prints each incoming data item and the end-of-stream marker to the console.
x
public class Printer<T> extends Sink<T>{ // print each arrived data item public void next(T item) { print(item); } // print "Job done" when input ends public void end() { print("Job done"); }}The Algo interface is used for describing the evaluation algorithm of a query. An implementation of Algo specifies how the input stream is transformed into the output stream.
The program below shows the definition of the Algo interface, which extends the Sink interface because it consumes a stream. The connect method connects the algorithm to a sink, and the init method initializes/resets the state of the algorithm.
xxxxxxxxxxpublic abstract class Algo<A,B> extends Sink<A>{ // connect to a sink public abstract void connect(Sink<B> sink); // initialize or reset the memory public abstract void init();}Below is an instance of Algo that echoes each incoming data item and the end-of-stream marker to the output.
x
// the type (i.e., A) of the input is the same as the outputpublic class Identity<A,A> extends Algo<A,A>{ private Sink idSink; public void connect(Sink<A> sink) { // connect a sink to the identity algorithm idSink = sink; } public void init() { // nothing to do } void next(A item) { // echoes the input item idSink.next(item); } void end() { // echoes the end-of-stream marker idSink.end(); }}In this section, core streaming constructs of StreamQL are introduced with illustrative examples.
The data stream used in examples is a stream of real-valued discrete-time signal measurements. Each measurement is of the form {val : double, ts : int}, where val is the signal strength, and ts is the discrete time-stamp carried by the measurement that increases continuously (0, 1, 2, ...).
The Java program below describes the type of the signal measurement.
public class VT { public double val; public int ts; public VT(double v, int t){ val = v; ts = t; } public String toString() { return "{"+val+","+ts+"}"; }}To present the signal stream as an input, a signal generator is introduced to this example signal:
xxxxxxxxxxSigGen src = new SigGen();// getVT returns null when the stream terminatesVT vt = src.getVT();Therefore, given an algorithm algo, the program below illustrates how to feed the input stream of signal measurements to algo.
x
VT vt = src.getVT();do { algo.next(vt); vt = src.getVT();} while (vt != null);algo.end();filterThe filter construct echoes the input items that satisfy a predicate to the output.
Given a VT stream, the query halve = filter(x -> x.ts % 2 == 0) collects the signal elements whose timestamp have an even value and thereby halving the sampling rate of the signal, where x -> x.ts % 2 == 0 is a lambda expression that serves as the filtering predicate.
x
SigGen src = new SigGen();// personalized sink to process the filtering outputSink<VT> sink = ...;Q<VT, VT> halve = QL.filter(x -> x.ts % 2 == 0);// get the evaluation algorithm of halveAlgo<VT, VT> algo = halve.eval();VT vt = src.getVT();do { algo.next(vt); vt = src.getVT();} while (vt != null);algo.end();mapThe map construct applies a mapping function to every input item and sends the results to the output. For example, square = map(x -> x.val * x.val) computes the squared signal strength for each input items and echoes the results to the output.
x
Sink<Double> sink = ...;Q<VT, Double> square = QL.map(x -> x.val*x.val);// get the evaluation algorithm of squareAlgo<VT, Double> algo = square.eval();algo.init();algo.connect(sink);VT vt = src.getVT();do { algo.next(vt); vt = src.getVT();} while (vt != null);algo.end();aggr and reduce In StreamQL, both aggr and reduce constructs describe the streaming aggregation over the input stream. The aggr construct computes the running aggregation of the input (i.e., it outputs the aggregate every time the aggregate is updated) while the reduce construct only emits the total aggregate when the
input stream terminates.
For example, queries f = aggr(0.0, (sum, x) -> sum + x.val) and g = reduce(0.0, (sum, x) -> sum + x.val) respectively compute the running sum and the total sum of the signal strength, where 0.0 is the initial value of the aggregate, and (sum, x) -> sum + x.val is a binary function that updates the aggregate. The table below illustrates the execution of f and g, where time progresses in the left-to-right direction, and # represents the end-of-stream marker.
| input | {0.0, 0} | {0.2, 1} | {1.2, 2} | {0.3, 3} | {0.7, 4} | # |
|---|---|---|---|---|---|---|
f output | 0.0 | 0.2 | 1.4 | 1.7 | 2.4 | # |
g output | 2.4 # |
StreamQL provides variants of the aggr and reduce constructs that allow users to aggregate the stream
using the value of the first input item as the initial value of the aggregate.
For example, the query below identifies the last received signal measurement.
x
Q<VT, VT> last = QL.reduce((x, y) -> y);More variants of the aggr and reduce constructs can be found in the StreamQL's API documentation.
pipelineStreamQL provides the pipeline construct to specify the streaming/serial composition. For example, for queries f of type Q<A,B> and g of type Q<B,C>, pipeline(f, g) sends the output of f to the input of g, and its type is Q<A,C>.
The program below uses the pipeline construct to compute the sum of the squared signal strength.
x
Q<VT, Double> square = QL.map(x -> x.val * x.val);Q<Double, Double> sum = QL.reduce((s, x) -> s + x);Q<VT, Double> squareSum = QL.pipeline(square, sum);This construct generalizes naturally to more than two arguments, and it is useful for setting up a complex computation as a pipeline of multiple stages. To illustrate, StreamQL provides constructs such as pipeline(f,g,h), pipeline(f,g,h,i), and etc.
tWindow and sWindowThe so-called windowing constructs are used to partition an unbounded stream into finite fragments called windows and perform computations on each one of them independently.
The tWindow (i.e., tumbling window) construct splits the stream into contiguous non-overlapping regions.
For a query f of type Q<A,B> and a natural number n that is larger than 0, the query tWindow(n,f) applies f to tumbling windows of size n.
The sWindow (i.e., sliding window) construct splits the stream into overlapping regions. For a query f of type Q<A,B> and natural numbers n, s with 0 < s < n, the query sWindow(n,s,f) applies f to windows of size n with a new window starting every s items.
Suppose f = reduce((sum, x) -> sum + x) which calculates the total sum of an integer stream.
The following table illustrates the execution of tWindow and sWindow.
| input: | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | # |
|---|---|---|---|---|---|---|---|---|---|
tWindow(2,f) output: | 3 | 7 | 11 | 15 | # | ||||
tWindow(3,f) output: | 6 | 15 | # | ||||||
sWindow(2,1,f) output: | 3 | 5 | 7 | 9 | 11 | 13 | 15 | # | |
sWindow(3,1,f) output: | 6 | 9 | 12 | 15 | 18 | 21 | # | ||
sWindow(3,2,f) output: | 6 | 12 | 18 | # |
Given a stream of signal measurements, suppose the sampling frequency of the signal is 100 Hz, the program below computes the sum of signal strength for every 10 seconds (i.e., 1000 samples).
xxxxxxxxxxQ<VT, Double> sum = QL.reduce(0.0, (s, x) -> s+x.val);Q<VT, Double> twndSum = QL.tWindow(1000, sum);Moreover, the program below computes the sum of the signal strength for past 10 seconds (i.e., 1000 samples) every 5 seconds (i.e., 500 samples) using the sWindow construct.
xxxxxxxxxxQ<VT, Double> swndSum = QL.sWindow( 1000, 500, QL.reduce(0.0, (s, x) -> s+x.val));tWindowN and sWindowNStreamQL provides variants of the window constructs that allow the programmer to specify a
function to reduce the items contained in a window of size n.
For example, in the program below, sWnd3 computes the sliding (moving) sum over windows of size 3, and tWnd2 computes the average over tumbling windows of size 2.
xxxxxxxxxxQ<VT, Double> sWnd3 = QL.sWindow3((x, y, z) -> x.val + y.val + z.val);Q<VT, Double> tWnd2 = QL.tWindow2((x, y) -> (x.val + y.val) / 2);StreamQL provides constructs to describe the moving-window aggregation (i.e., aggregation over sliding windows). Given an initial aggregate init of type B and a functionupdate of type that updates the aggregate, the query sWndReduce(wndSize, slidingInterval, init, update) uses the moving-window aggregation construct sWndReduce to apply the reducing aggregation over sliding windows with length wndSize and sliding interval slidingInterval.
Moreover, StreamQL defines an additional construct, sWndReduceI, for users to implement the efficient built-in sliding window algorithms when the aggregation is invertible. The query sWndReduceI(wndSize, slidingInterval, init, add, remove) updates the moving-window aggregate (add the new item, remove the item falling off the window).
For example, suppose add is (sum, newItem) -> sum + newItem, the remove that inverts the aggregation should be(sum, oldItem) -> sum − oldItem.
The program below presents the query that computes the average strength of signal in a sliding window, where Pair is a class that contains two fields: sum of type Double and cnt of type Integer.
x
Q<VT, Pair> sWndAgg = QL.sWndReduceI( 1000, 500, new Pair(0.0, 0), (p, x) -> new Pair(p.sum + x.val, p.cnt + 1), (p, x) -> new Pair(p.sum - x.val, p.cnt - 1));Q<Pair, Double> avg = QL.map(p -> p.sum / p.cnt);Q<VT, Double> sWndAvg = QL.pipeline(sWndAgg, avg);parallelStreamQL provides the parallel construct for executing multiple queries in parallel on the same input stream and combining their results.
For queries f and g of type Q<A,B>, the query parallel(f,g) of type Q<A,B> describes the following
computation: The input stream is duplicated with one copy sent to f and one copy sent to g. The queries f and g compute in parallel, and their outputs are merged (specifically, interleaved) to produce the final output stream.
Using the parallel construct, the query that monitors the running average value of the signal can be written as:
xxxxxxxxxxQ<VT,Double> sumCnt = QL.parallel( QL.aggr(0.0, (sum, x) -> sum + x.val), QL.aggr(0.0, (cnt, x) -> cnt + 1));Q<Double,Double> twnd2 = QL.tWindow2((sum, cnt) -> sum / cnt);Q<VT,Double> avg = QL.pipeline(sumCnt, twnd2);groupBy Suppose the measurements are detected by multiple sensors, where each measurement records the identifier, id, of the sensor. Such a measurement can be represented by the Java class as below:
xxxxxxxxxxpublic class IVT { public int id; public double val; public int ts; public IVT(int i, double v, int t){ id = i; val = v; ts = t; } public String toString() { return "{"+id+","+val+","+ts+"}"; }}Suppose that we have written a query f of type Q<IVT,B> that computes an aggregate of items with a fixed identifier, i.e. under the assumption that all the items of the input stream have the same identifier.
Then, to compute this aggregate across all identifiers, the most natural way to to partition the input stream by a key, the identifier field id in this case, and supply the corresponding projected sub-stream to a copy of f.
This construct is called groupBy, and it can be used in the following format:
xxxxxxxxxxQ<IVT, C> g = Q.groupBy(x -> x.id, f, (key, b) -> ... );The first argument x -> x.id is a function that specifies the partitioning key, f describes the computation that will be independently performed on each sub-stream corresponding to a key, and (key,b)-> ... is a function of type that returns the final results by combining key and the output of f from each sub-stream.
The program below counts the total number of samples measured by each sensor, where KV is a key-value pair to store the final results.
xxxxxxxxxxQ<IVT, KV<Integer,Integer>> g = QL.groupBy( x -> x.id, QL.reduce(0, (cnt, x) -> cnt + 1), (id, cnt) -> new KV(id, cnt));take and takeUntilGiven a predicate p on elements of type A, the query takeUntil(p) of type Q<A,A> computes like the identity transformation while there is no occurrence of an item satisfying p in the input.
When it encounters the first item satisfying p, it emits it to the output and halts.
A similar query is take(n) of type Q<A,A>, where n is a positive integer, which echoes the first n items
of the input stream to the output and then halts.
| input: | 1 | 2 | 3 | 4 | 5 | 6 | 7 | # |
|---|---|---|---|---|---|---|---|---|
take(5) output: | 1 | 2 | 3 | 4 | 5 # | |||
takeUntil(x -> x >= 4) output: | 1 | 2 | 3 | 4 # |
For example, given the sampling frequency as 100 Hz, the program below selects the samples that arrive in the first 1 minute.
x
Q<IVT, IVT> take1min = QL.take(6000);// or equivalentlyQ<IVT, IVT> take1min = QL.takeUntil(x -> x.time >= 6000);take and takeUntilStreamQL provides variants for the take and takeUntil constructs.
Given a query f of type Q<A,B> and a predicate p over B, takeUntil(f, p) of type Q<A,B> applies f to its input stream and applies takeUntil(p) to the output of f, i.e., it is the syntax sugar of pipeline(f, takeUntil(p)).
Similarly, given a positive integer n, take(f, n) applies take(n) to the output of f, which is the syntax sugar of pipeline(f, take(n)).
Using these two syntax sugars leads to the improvement of performance because (1) the overhead of the streaming composition is reduced, and (2) once enough elements have been taken, take(n,f) and takeUntil(p,f) can immediately terminate query f.
skip and skipUntilGiven a predicate p on streaming elements of type A, the query skipUntil(p) of type Q<A,A> emits no output while the input contains no item satisfying p. When the first item satisfying p is seen, it emits it to the output and continues to compute like the identity transformation.
The query skip(n) of type Q<A,A>, for a positive integer n, emits no output for the first n input items, and then proceeds to echo the rest of the input stream.
| input: | 1 | 2 | 3 | 4 | 5 | 6 | 7 | # |
|---|---|---|---|---|---|---|---|---|
skip(3) output: | 4 | 5 | 6 | 7 | # | |||
skipUntil(x -> x >= 4) output: | 4 | 5 | 6 | 7 | # |
If the sampling frequency is 100 Hz, the program below skips the signal measurements received in the first 1 minute.
x
Q<IVT, IVT> skip1min = QL.skip(6000);// or equivalentlyQ<IVT, IVT> skip1min = QL.skipUntil(x -> x.time > 6000);ignoreThe ignore construct ignores the input items and halts when receiving an end-of-stream marker.
| input: | 1 | 2 | 3 | 4 | 5 | # |
|---|---|---|---|---|---|---|
ignore() output: | # |
StreamQL provides the syntax sugar for pipeline(take(n),ignore()) as ignore(n) and pipeline(takeUntil(p),ignore()) as ignoreUntil(p).
The program below ignores the first 100 signal measurements.
xxxxxxxxxxQ<IVT, IVT> ignore100 = QL.ignore(100);searchGiven a predicate p on streaming elements of type A, the query search(p) of type Q<A,A> emits no output while it searches for the first occurrence of an item satisfying p. When it encounters such an item, it emits the item to the output and halts.
| input: | 1 | 2 | 3 | 4 | 5 | # |
|---|---|---|---|---|---|---|
search(x -> x >= 4) output: | 4 # |
seqThe seq construct (i.e., temporal sequencing combinator) can be used to apply different queries in sequence (i.e., one after the other), thus varying the computation over time. More specifically, for queries f and g of type Q<A,B>, their sequencing seq(f,g) of type Q<A,B> computes like f until it halts, and then it proceeds to compute like g.
For example, if f = search(x -> x >= 3) and g = takeUntil(x -> x <= 2), then seq(f, g) computes as:
| input: | 1 | 2 | 3 | 4 | 5 | 2 | 1 | # |
|---|---|---|---|---|---|---|---|---|
f output: | 3 # | |||||||
seq(f, g) output: | 3 | 4 | 5 | 2 # |
Suppose the signal is noisy in a period from the 1000th sample to the 2000th sample, the following program skips this part of signal samples and computes the sum of the signal strength for the remaining part of the stream:
x
// select the signal without noiseQ<VT, VT> take = QL.take(1000);Q<VT, VT> skip = QL.skip(1000);Q<VT, VT> removeNoise = QL.seq(take, skip);// compute the sum of signal strengthQ<VT, Double> sum = QL.reduce(0.0, (s, x) -> s + x.val);// compose removeNoise and sumQ<VT, Double> noNoiseSum = QL.pipeline(removeNoise, sum);iterateThe iterate (i.e., temporal iteration) construct can be used to repeat a streaming computation indefinitely.
For a query f of type Q<A,B>, its iteration iterate(f) of type Q<A,B> executes f and every time it halts, it is restarted. This results in an unbounded temporal repetition of the computation that f specifies.
Now, the iteration of pipeline(f, g), where f = takeUntil(x -> x == 0) and g = reduce((s, y) -> s + y), computes as shown below
| input: | 2 | 1 | 3 | 0 | 4 | 5 | 0 | 1 | 7 | 0 | 3 |
|---|---|---|---|---|---|---|---|---|---|---|---|
f output: | 2 | 1 | 3 | 0 # | |||||||
pipeline(f, g) output: | 6 # | ||||||||||
iterate(pipeline(f, g)) output: | 6 | 9 | 8 |
Suppose the sampling rate of the signal is 100 Hz, the program below computes the maximum signal strength in the first 10 seconds for every 30 seconds.
x
// select signal in the first 10 seconds (1000 samples)// and drop signal in the following 20 seconds (2000 samples)Q<VT, VT> take = QL.take(1000);Q<VT, VT> drop = QL.pipeline(QL.take(2000), QL.ignore());Q<VT, VT> select = QL.seq(take, drop);// compute the maximum signal strengthQ<VT, Double> max = QL.reduce(Double.MIN_VALUE, (m, x) -> Math.max(m, x.val));// put them togetherQ<VT, Double> oneMaxVal = QL.pipeline(select, max);Q<VT, Double> allMaxVal = QL.iterate(oneMaxVal);StreamQL can be used to program streaming algorithms for real-world applications. This section will show how to use StreamQL to program a simple but effective algorithm for detecting peaks in the ECG (electrocardiogram) signal. This problem is one of the most widely studied detection problems in the area of biomedical engineering, as it forms the basis of many analyses over cardiac data.
In the above figure, (a) shows part of an ECG, which is the electrical cardiac signal recorded on the surface of the skin near the heart. The horizontal axis is time and the vertical axis is voltage. A simple but effective procedure for detecting the peaks consists of three stages:
Figure (b) shows a short snippet of an ECG signal, where the gray line corresponds to the input time series , the green line is the smoothed data
and the blue line is the derivative
A straightforward algorithm for detecting the peaks is to find the first occurrence (let us say at time ) where the derivative exceeds a pre-defined threshold hThred, followed by the first occurrence after (let us say at time ) where the derivative become less than a pre-defined threshold lThred.
Time point is located somewhere on the ascending slope towards the peak, and time point is located somewhere on the descending slope after the peak.
So, the exact peak location can be found by searching for the maximum value of the original time series in the interval from to . This pattern is illustrated in Figure (c).
Then, every time a peak is identified, this detection procedure is reset and repeated.
After giving a high-level description of a simple streaming algorithm for detecting the peaks in the ECG signal. We now use the StreamQL language to provide a complete description of this peak detection algorithm, which is a variant of Single-channel QRS detector (SQRS).
Suppose the type of the input data items is a record type VT = { val : V, ts : T }, where V is the time of scalar values, and T is the type of time points.
Suppose V is Double, and T is Integer, we have
xxxxxxxxxxpublic class VT { public double val; public int ts; public VT(double v, int t){ val = v; ts = t; } public String toString() { return "{"+val+","+ts+"}"; }}At the top level, findPeak specifies the peak detection algorithm for the ECG data stream, which is defined asfindPeak = pipeline(smooth, deriv, detect). Here, smooth, deriv, and detect correspond to three aforementioned pipelining stages: (1) smoothing the signal, (2) computing derivatives, and (3) detecting peaks.
The smoothing query smooth of type Q<VT,VTF> has output type VTF, which is the record type VT extended with an additional component fval of type V for storing the smoothed (low-pass filtered) value. The specification of VTF is given by:
xxxxxxxxxxpublic class VTF { public double val; public int ts; public double fval; public VTF(double v, int t, double f){ val = v; ts = t; fval = f; } public VTF extend(VT vt, double f){ return new VTF(vt.val, vt.ts, f); } public String toString() { return "{"+val+","+ts+","+fval+"}"; }}The following program presents the StreamQL implementation of smooth:
xxxxxxxxxxQ<VT, VTF> smooth = QL.sWindow5( (v, w, x, y, z) -> VTF.extend(x, 0.1*v.val + 0.2*w.val + 0.4*x.val + 0.2*y.val + 0.1*z.val));The idea is that for a sample x at time x.ts we consider the window (v, w, x, y, z) centered around x and calculate a weighted average over the window for the smoothed value.
The differentiation query derivQ of type Q<VTF,VTFD> calculates discrete derivatives by taking the difference of successive smoothed values. It is implemented as:
xxxxxxxxxxQ<VTF, VTFD> deriv = QL.sWindow2((x, y) -> VTFD.extend(y, y.fval - x.fval));The record type VTFD extends VTF with an additional component dval for storing the derivative:
xxxxxxxxxxpublic class VTFD { public double val; public int ts; public double fval; public double dval; public VTFD(double v, int t, double f, double d){ val = v; ts = t; fval = f; dval = d; } static public VTFD extend(VTF vtf, double d){ return new VTFD(vtf.val, vtf.ts, vtf.fval, d); } public String toString() { return "{"+val+","+ts+","+fval+","+dval+"}"; }}Then, in the detecting step, the detection of the first peak involves searching for the first time point when dval exceeds the threshold hThred. The signal interval from this point until the time point when dval falls below the threshold lThred contains the first peak. The signal in the interval is streamed to the argmax query (see below), which finds the data item with the highest value (in the raw, unfiltered signal). This process is repeated indefinitely in order to detect all peaks:
xxxxxxxxxxQ<VTFD,VTFD> start = QL.search(x -> x.dval > hThred);Q<VTFD,VTFD> take = QL.takeUntil(x -> x.dval < lThred);Q<VTFD,VTFD> argmax = QL.reduce((x, y) -> (y.val > x.val) ? y : x);Q<VTFD,VTFD> detect = QL.iterate(Q.pipeline(Q.seq(start, take), argmax));Finally, we present the complete StreamQL program for this case study:
x
SigGen src = new SigGen();// create the sink for detected peaksConsumer<VTFD> sink = ...// initialize the thresholdsfinal double hThred = ...;final double lThred = ...;// smooth the signal to eliminate high-frequency noiseQ<VT, VTF> smooth = QL.sWindow5( (v, w, x, y, z) -> VTF.extend(x, 0.1*v.val + 0.2*w.val + 0.4*x.val + 0.2*y.val + 0.1*z.val));// take the derivative of the smoothed signal to calculate the slopeQ<VTF, VTFD> deriv = QL.sWindow2((x, y) -> VTFD.extend(y, y.fval - x.fval));// finding the peaksQ<VTFD, VTFD> start = QL.search(x -> x.dval > hThred);Q<VTFD, VTFD> take = QL.takeUntil(x -> x.dval < lThred);Q<VTFD, VTFD> argmax = QL.reduce((x, y) -> (y.val > x.val) ? y : x);Q<VTFD, VTFD> getOnePeak = QL.pipeline(QL.seq(start, take), argmax);Q<VTFD, VTFD> detect = QL.iterate(getOnePeak);// compose the 3-step pipelineQ<VT, VTFD> findPeak = QL.pipeline(smooth, deriv, detect);// execution Algo<VT, VTFD> algo = findPeak.eval();algo.init();algo.connect(sink);VT vt = src.getVT();do { algo.next(vt); vt = src.getVT();} while (vt != null);algo.end();This tutorial uses several example to show how to program a streaming computation using the Java library of StreamQL. However, it does not cover all streaming constructs in the StreamQL library. More details can be found in the API documentation of StreamQL.