Welcome! StreamQL is a query language for efficient processing IoT data streams. StreamQL provides a rich set of streaming constructs, and it is implemented as a light-weight Java library. In this tutorial, you will learn how to write StreamQL queries in Java to process the data streams.
The tutorial will first provide a very simple example of a StreamQL program. Then, the tutorial will introduce some basic interfaces in the StreamQL's Java library. After that, this tutorial will introduce several core streaming constructs (i.e., operators) of StreamQL and use them to program a healthcare monitoring application.
Let's get started!
The code below gives a simple example of a StreamQL program in Java.
x
// VT is the type of measurements, which contains a double value as val
Iterator<VT> stream = ... // input stream
// sink of the output stream
Sink<Double> sink = ...
// sum of the measurements
Q<VT, Double> sum = QL.aggr(0.0, (s, vt) -> s + vt.val);
// evaluation of the query
Algo<VT, Double> exe = sum.eval();
// connect the output of query to sink
exe.connect(sink);
// execution loop
exe.init();
while (stream.hasNext()) {
VT vt = stream.next();
exe.next(vt);
}
exe.end();
Given a signal measurement of type VT
that contains a double value in the field of val
, the query sum
of type Q
computes the sum of the values of the measurements. The method eval
returns an object that encapsulates the evaluation algorithm for the query. The methods init
and next
are used to initialize the memory and consume data items. When the input stream terminates, the end
method is invoked.
Sink
and Algo
StreamQL defines two interfaces, Sink
and Algo
, to describe the streaming computation in a push-based manner.
The Sink
interface is used for specifying a sink that consumes a stream. A sink consumes a stream with two methods, next
and end
, that are used for stream elements and the end-of-stream marker respectively.
The code below shows the definition of the Sink
interface in Java.
xxxxxxxxxx
public abstract class Sink<T> {
// deal with incoming items
public abstract void next(T item);
// deal with the end-of-stream marker
public abstract void end();
}
Below is an instance of Sink
that prints each incoming data item and the end-of-stream marker to the console.
x
public class Printer<T> extends Sink<T>{
// print each arrived data item
public void next(T item) {
print(item);
}
// print "Job done" when input ends
public void end() {
print("Job done");
}
}
The Algo
interface is used for describing the evaluation algorithm of a query. An implementation of Algo
specifies how the input stream is transformed into the output stream.
The program below shows the definition of the Algo
interface, which extends the Sink
interface because it consumes a stream. The connect
method connects the algorithm to a sink, and the init
method initializes/resets the state of the algorithm.
xxxxxxxxxx
public abstract class Algo<A,B> extends Sink<A>{
// connect to a sink
public abstract void connect(Sink<B> sink);
// initialize or reset the memory
public abstract void init();
}
Below is an instance of Algo
that echoes each incoming data item and the end-of-stream marker to the output.
x
// the type (i.e., A) of the input is the same as the output
public class Identity<A,A> extends Algo<A,A>{
private Sink idSink;
public void connect(Sink<A> sink) {
// connect a sink to the identity algorithm
idSink = sink;
}
public void init() {
// nothing to do
}
void next(A item) {
// echoes the input item
idSink.next(item);
}
void end() {
// echoes the end-of-stream marker
idSink.end();
}
}
In this section, core streaming constructs of StreamQL are introduced with illustrative examples.
The data stream used in examples is a stream of real-valued discrete-time signal measurements. Each measurement is of the form {val
: double
, ts
: int
}, where val
is the signal strength, and ts
is the discrete time-stamp carried by the measurement that increases continuously (0, 1, 2, ...).
The Java program below describes the type of the signal measurement.
public class VT {
public double val;
public int ts;
public VT(double v, int t){
val = v;
ts = t;
}
public String toString() {
return "{"+val+","+ts+"}";
}
}
To present the signal stream as an input, a signal generator is introduced to this example signal:
xxxxxxxxxx
SigGen src = new SigGen();
// getVT returns null when the stream terminates
VT vt = src.getVT();
Therefore, given an algorithm algo
, the program below illustrates how to feed the input stream of signal measurements to algo
.
x
VT vt = src.getVT();
do {
algo.next(vt);
vt = src.getVT();
} while (vt != null);
algo.end();
filter
The filter construct echoes the input items that satisfy a predicate to the output.
Given a VT
stream, the query halve = filter(x -> x.ts % 2 == 0)
collects the signal elements whose timestamp have an even value and thereby halving the sampling rate of the signal, where x -> x.ts % 2 == 0
is a lambda expression that serves as the filtering predicate.
x
SigGen src = new SigGen();
// personalized sink to process the filtering output
Sink<VT> sink = ...;
Q<VT, VT> halve = QL.filter(x -> x.ts % 2 == 0);
// get the evaluation algorithm of halve
Algo<VT, VT> algo = halve.eval();
VT vt = src.getVT();
do {
algo.next(vt);
vt = src.getVT();
} while (vt != null);
algo.end();
map
The map construct applies a mapping function to every input item and sends the results to the output. For example, square = map(x -> x.val * x.val)
computes the squared signal strength for each input items and echoes the results to the output.
x
Sink<Double> sink = ...;
Q<VT, Double> square = QL.map(x -> x.val*x.val);
// get the evaluation algorithm of square
Algo<VT, Double> algo = square.eval();
algo.init();
algo.connect(sink);
VT vt = src.getVT();
do {
algo.next(vt);
vt = src.getVT();
} while (vt != null);
algo.end();
aggr
and reduce
In StreamQL, both aggr
and reduce
constructs describe the streaming aggregation over the input stream. The aggr
construct computes the running aggregation of the input (i.e., it outputs the aggregate every time the aggregate is updated) while the reduce
construct only emits the total aggregate when the
input stream terminates.
For example, queries f = aggr(0.0, (sum, x) -> sum + x.val)
and g = reduce(0.0, (sum, x) -> sum + x.val)
respectively compute the running sum and the total sum of the signal strength, where 0.0
is the initial value of the aggregate, and (sum, x) -> sum + x.val
is a binary function that updates the aggregate. The table below illustrates the execution of f
and g
, where time progresses in the left-to-right direction, and #
represents the end-of-stream marker.
input | {0.0, 0} | {0.2, 1} | {1.2, 2} | {0.3, 3} | {0.7, 4} | # |
---|---|---|---|---|---|---|
f output | 0.0 | 0.2 | 1.4 | 1.7 | 2.4 | # |
g output | 2.4 # |
StreamQL provides variants of the aggr
and reduce
constructs that allow users to aggregate the stream
using the value of the first input item as the initial value of the aggregate.
For example, the query below identifies the last received signal measurement.
x
Q<VT, VT> last = QL.reduce((x, y) -> y);
More variants of the aggr
and reduce
constructs can be found in the StreamQL's API documentation.
pipeline
StreamQL provides the pipeline
construct to specify the streaming/serial composition. For example, for queries f
of type Q<A,B>
and g
of type Q<B,C>
, pipeline(f, g)
sends the output of f
to the input of g
, and its type is Q<A,C>
.
The program below uses the pipeline
construct to compute the sum of the squared signal strength.
x
Q<VT, Double> square = QL.map(x -> x.val * x.val);
Q<Double, Double> sum = QL.reduce((s, x) -> s + x);
Q<VT, Double> squareSum = QL.pipeline(square, sum);
This construct generalizes naturally to more than two arguments, and it is useful for setting up a complex computation as a pipeline of multiple stages. To illustrate, StreamQL provides constructs such as pipeline(f,g,h)
, pipeline(f,g,h,i)
, and etc.
tWindow
and sWindow
The so-called windowing constructs are used to partition an unbounded stream into finite fragments called windows and perform computations on each one of them independently.
The tWindow
(i.e., tumbling window) construct splits the stream into contiguous non-overlapping regions.
For a query f
of type Q<A,B>
and a natural number n
that is larger than 0, the query tWindow(n,f)
applies f
to tumbling windows of size n
.
The sWindow
(i.e., sliding window) construct splits the stream into overlapping regions. For a query f
of type Q<A,B>
and natural numbers n
, s
with 0 < s
< n
, the query sWindow(n,s,f)
applies f
to windows of size n
with a new window starting every s
items.
Suppose f = reduce((sum, x) -> sum + x)
which calculates the total sum of an integer stream.
The following table illustrates the execution of tWindow
and sWindow
.
input: | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | # |
---|---|---|---|---|---|---|---|---|---|
tWindow(2,f) output: | 3 | 7 | 11 | 15 | # | ||||
tWindow(3,f) output: | 6 | 15 | # | ||||||
sWindow(2,1,f) output: | 3 | 5 | 7 | 9 | 11 | 13 | 15 | # | |
sWindow(3,1,f) output: | 6 | 9 | 12 | 15 | 18 | 21 | # | ||
sWindow(3,2,f) output: | 6 | 12 | 18 | # |
Given a stream of signal measurements, suppose the sampling frequency of the signal is 100 Hz, the program below computes the sum of signal strength for every 10 seconds (i.e., 1000 samples).
xxxxxxxxxx
Q<VT, Double> sum = QL.reduce(0.0, (s, x) -> s+x.val);
Q<VT, Double> twndSum = QL.tWindow(1000, sum);
Moreover, the program below computes the sum of the signal strength for past 10 seconds (i.e., 1000 samples) every 5 seconds (i.e., 500 samples) using the sWindow
construct.
xxxxxxxxxx
Q<VT, Double> swndSum = QL.sWindow(
1000, 500, QL.reduce(0.0, (s, x) -> s+x.val)
);
tWindowN
and sWindowN
StreamQL provides variants of the window constructs that allow the programmer to specify a
function to reduce the items contained in a window of size n
.
For example, in the program below, sWnd3
computes the sliding (moving) sum over windows of size 3, and tWnd2
computes the average over tumbling windows of size 2.
xxxxxxxxxx
Q<VT, Double> sWnd3 = QL.sWindow3((x, y, z) -> x.val + y.val + z.val);
Q<VT, Double> tWnd2 = QL.tWindow2((x, y) -> (x.val + y.val) / 2);
StreamQL provides constructs to describe the moving-window aggregation (i.e., aggregation over sliding windows). Given an initial aggregate init
of type B
and a functionupdate
of type that updates the aggregate, the query sWndReduce(wndSize, slidingInterval, init, update)
uses the moving-window aggregation construct sWndReduce
to apply the reducing aggregation over sliding windows with length wndSize
and sliding interval slidingInterval
.
Moreover, StreamQL defines an additional construct, sWndReduceI
, for users to implement the efficient built-in sliding window algorithms when the aggregation is invertible. The query sWndReduceI(wndSize, slidingInterval, init, add, remove)
updates the moving-window aggregate (add
the new item, remove
the item falling off the window).
For example, suppose add
is (sum, newItem) -> sum + newItem
, the remove
that inverts the aggregation should be(sum, oldItem) -> sum − oldItem
.
The program below presents the query that computes the average strength of signal in a sliding window, where Pair
is a class that contains two fields: sum
of type Double
and cnt
of type Integer
.
x
Q<VT, Pair> sWndAgg = QL.sWndReduceI(
1000, 500,
new Pair(0.0, 0),
(p, x) -> new Pair(p.sum + x.val, p.cnt + 1),
(p, x) -> new Pair(p.sum - x.val, p.cnt - 1)
);
Q<Pair, Double> avg = QL.map(p -> p.sum / p.cnt);
Q<VT, Double> sWndAvg = QL.pipeline(sWndAgg, avg);
parallel
StreamQL provides the parallel
construct for executing multiple queries in parallel on the same input stream and combining their results.
For queries f
and g
of type Q<A,B>
, the query parallel(f,g)
of type Q<A,B>
describes the following
computation: The input stream is duplicated with one copy sent to f
and one copy sent to g
. The queries f
and g
compute in parallel, and their outputs are merged (specifically, interleaved) to produce the final output stream.
Using the parallel
construct, the query that monitors the running average value of the signal can be written as:
xxxxxxxxxx
Q<VT,Double> sumCnt = QL.parallel(
QL.aggr(0.0, (sum, x) -> sum + x.val),
QL.aggr(0.0, (cnt, x) -> cnt + 1)
);
Q<Double,Double> twnd2 = QL.tWindow2((sum, cnt) -> sum / cnt);
Q<VT,Double> avg = QL.pipeline(sumCnt, twnd2);
groupBy
Suppose the measurements are detected by multiple sensors, where each measurement records the identifier, id
, of the sensor. Such a measurement can be represented by the Java class as below:
xxxxxxxxxx
public class IVT {
public int id;
public double val;
public int ts;
public IVT(int i, double v, int t){
id = i;
val = v;
ts = t;
}
public String toString() {
return "{"+id+","+val+","+ts+"}";
}
}
Suppose that we have written a query f
of type Q<IVT,B>
that computes an aggregate of items with a fixed identifier, i.e. under the assumption that all the items of the input stream have the same identifier.
Then, to compute this aggregate across all identifiers, the most natural way to to partition the input stream by a key, the identifier field id
in this case, and supply the corresponding projected sub-stream to a copy of f
.
This construct is called groupBy
, and it can be used in the following format:
xxxxxxxxxx
Q<IVT, C> g = Q.groupBy(x -> x.id, f, (key, b) -> ... );
The first argument x -> x.id
is a function that specifies the partitioning key, f
describes the computation that will be independently performed on each sub-stream corresponding to a key, and (key,b)-> ...
is a function of type that returns the final results by combining key and the output of f
from each sub-stream.
The program below counts the total number of samples measured by each sensor, where KV
is a key-value pair to store the final results.
xxxxxxxxxx
Q<IVT, KV<Integer,Integer>> g = QL.groupBy(
x -> x.id,
QL.reduce(0, (cnt, x) -> cnt + 1),
(id, cnt) -> new KV(id, cnt)
);
take
and takeUntil
Given a predicate p
on elements of type A
, the query takeUntil(p)
of type Q<A,A>
computes like the identity transformation while there is no occurrence of an item satisfying p
in the input.
When it encounters the first item satisfying p
, it emits it to the output and halts.
A similar query is take(n)
of type Q<A,A>
, where n
is a positive integer, which echoes the first n
items
of the input stream to the output and then halts.
input: | 1 | 2 | 3 | 4 | 5 | 6 | 7 | # |
---|---|---|---|---|---|---|---|---|
take(5) output: | 1 | 2 | 3 | 4 | 5 # | |||
takeUntil(x -> x >= 4) output: | 1 | 2 | 3 | 4 # |
For example, given the sampling frequency as 100 Hz, the program below selects the samples that arrive in the first 1 minute.
x
Q<IVT, IVT> take1min = QL.take(6000);
// or equivalently
Q<IVT, IVT> take1min = QL.takeUntil(x -> x.time >= 6000);
take
and takeUntil
StreamQL provides variants for the take
and takeUntil
constructs.
Given a query f
of type Q<A,B>
and a predicate p
over B
, takeUntil(f, p)
of type Q<A,B>
applies f
to its input stream and applies takeUntil(p)
to the output of f
, i.e., it is the syntax sugar of pipeline(f, takeUntil(p))
.
Similarly, given a positive integer n
, take(f, n)
applies take(n)
to the output of f
, which is the syntax sugar of pipeline(f, take(n))
.
Using these two syntax sugars leads to the improvement of performance because (1) the overhead of the streaming composition is reduced, and (2) once enough elements have been taken, take(n,f)
and takeUntil(p,f)
can immediately terminate query f
.
skip
and skipUntil
Given a predicate p
on streaming elements of type A
, the query skipUntil(p)
of type Q<A,A>
emits no output while the input contains no item satisfying p
. When the first item satisfying p
is seen, it emits it to the output and continues to compute like the identity transformation.
The query skip(n)
of type Q<A,A>
, for a positive integer n
, emits no output for the first n
input items, and then proceeds to echo the rest of the input stream.
input: | 1 | 2 | 3 | 4 | 5 | 6 | 7 | # |
---|---|---|---|---|---|---|---|---|
skip(3) output: | 4 | 5 | 6 | 7 | # | |||
skipUntil(x -> x >= 4) output: | 4 | 5 | 6 | 7 | # |
If the sampling frequency is 100 Hz, the program below skips the signal measurements received in the first 1 minute.
x
Q<IVT, IVT> skip1min = QL.skip(6000);
// or equivalently
Q<IVT, IVT> skip1min = QL.skipUntil(x -> x.time > 6000);
ignore
The ignore
construct ignores the input items and halts when receiving an end-of-stream marker.
input: | 1 | 2 | 3 | 4 | 5 | # |
---|---|---|---|---|---|---|
ignore() output: | # |
StreamQL provides the syntax sugar for pipeline(take(n),ignore())
as ignore(n)
and pipeline(takeUntil(p),ignore())
as ignoreUntil(p)
.
The program below ignores the first 100 signal measurements.
xxxxxxxxxx
Q<IVT, IVT> ignore100 = QL.ignore(100);
search
Given a predicate p
on streaming elements of type A
, the query search(p)
of type Q<A,A>
emits no output while it searches for the first occurrence of an item satisfying p
. When it encounters such an item, it emits the item to the output and halts.
input: | 1 | 2 | 3 | 4 | 5 | # |
---|---|---|---|---|---|---|
search(x -> x >= 4) output: | 4 # |
seq
The seq
construct (i.e., temporal sequencing combinator) can be used to apply different queries in sequence (i.e., one after the other), thus varying the computation over time. More specifically, for queries f
and g
of type Q<A,B>
, their sequencing seq(f,g)
of type Q<A,B>
computes like f
until it halts, and then it proceeds to compute like g
.
For example, if f = search(x -> x >= 3)
and g = takeUntil(x -> x <= 2)
, then seq(f, g)
computes as:
input: | 1 | 2 | 3 | 4 | 5 | 2 | 1 | # |
---|---|---|---|---|---|---|---|---|
f output: | 3 # | |||||||
seq(f, g) output: | 3 | 4 | 5 | 2 # |
Suppose the signal is noisy in a period from the 1000th sample to the 2000th sample, the following program skips this part of signal samples and computes the sum of the signal strength for the remaining part of the stream:
x
// select the signal without noise
Q<VT, VT> take = QL.take(1000);
Q<VT, VT> skip = QL.skip(1000);
Q<VT, VT> removeNoise = QL.seq(take, skip);
// compute the sum of signal strength
Q<VT, Double> sum = QL.reduce(0.0, (s, x) -> s + x.val);
// compose removeNoise and sum
Q<VT, Double> noNoiseSum = QL.pipeline(removeNoise, sum);
iterate
The iterate
(i.e., temporal iteration) construct can be used to repeat a streaming computation indefinitely.
For a query f
of type Q<A,B>
, its iteration iterate(f)
of type Q<A,B>
executes f
and every time it halts, it is restarted. This results in an unbounded temporal repetition of the computation that f
specifies.
Now, the iteration of pipeline(f, g)
, where f = takeUntil(x -> x == 0)
and g = reduce((s, y) -> s + y)
, computes as shown below
input: | 2 | 1 | 3 | 0 | 4 | 5 | 0 | 1 | 7 | 0 | 3 |
---|---|---|---|---|---|---|---|---|---|---|---|
f output: | 2 | 1 | 3 | 0 # | |||||||
pipeline(f, g) output: | 6 # | ||||||||||
iterate(pipeline(f, g)) output: | 6 | 9 | 8 |
Suppose the sampling rate of the signal is 100 Hz, the program below computes the maximum signal strength in the first 10 seconds for every 30 seconds.
x
// select signal in the first 10 seconds (1000 samples)
// and drop signal in the following 20 seconds (2000 samples)
Q<VT, VT> take = QL.take(1000);
Q<VT, VT> drop = QL.pipeline(QL.take(2000), QL.ignore());
Q<VT, VT> select = QL.seq(take, drop);
// compute the maximum signal strength
Q<VT, Double> max = QL.reduce(Double.MIN_VALUE, (m, x) -> Math.max(m, x.val));
// put them together
Q<VT, Double> oneMaxVal = QL.pipeline(select, max);
Q<VT, Double> allMaxVal = QL.iterate(oneMaxVal);
StreamQL can be used to program streaming algorithms for real-world applications. This section will show how to use StreamQL to program a simple but effective algorithm for detecting peaks in the ECG (electrocardiogram) signal. This problem is one of the most widely studied detection problems in the area of biomedical engineering, as it forms the basis of many analyses over cardiac data.
In the above figure, (a) shows part of an ECG, which is the electrical cardiac signal recorded on the surface of the skin near the heart. The horizontal axis is time and the vertical axis is voltage. A simple but effective procedure for detecting the peaks consists of three stages:
Figure (b) shows a short snippet of an ECG signal, where the gray line corresponds to the input time series , the green line is the smoothed data
and the blue line is the derivative
A straightforward algorithm for detecting the peaks is to find the first occurrence (let us say at time ) where the derivative exceeds a pre-defined threshold hThred
, followed by the first occurrence after (let us say at time ) where the derivative become less than a pre-defined threshold lThred
.
Time point is located somewhere on the ascending slope towards the peak, and time point is located somewhere on the descending slope after the peak.
So, the exact peak location can be found by searching for the maximum value of the original time series in the interval from to . This pattern is illustrated in Figure (c).
Then, every time a peak is identified, this detection procedure is reset and repeated.
After giving a high-level description of a simple streaming algorithm for detecting the peaks in the ECG signal. We now use the StreamQL language to provide a complete description of this peak detection algorithm, which is a variant of Single-channel QRS detector (SQRS).
Suppose the type of the input data items is a record type VT = { val : V, ts : T }
, where V
is the time of scalar values, and T
is the type of time points.
Suppose V
is Double
, and T
is Integer
, we have
xxxxxxxxxx
public class VT {
public double val;
public int ts;
public VT(double v, int t){
val = v;
ts = t;
}
public String toString() {
return "{"+val+","+ts+"}";
}
}
At the top level, findPeak
specifies the peak detection algorithm for the ECG data stream, which is defined asfindPeak = pipeline(smooth, deriv, detect)
. Here, smooth
, deriv
, and detect
correspond to three aforementioned pipelining stages: (1) smoothing the signal, (2) computing derivatives, and (3) detecting peaks.
The smoothing query smooth
of type Q<VT,VTF>
has output type VTF
, which is the record type VT
extended with an additional component fval
of type V
for storing the smoothed (low-pass filtered) value. The specification of VTF
is given by:
xxxxxxxxxx
public class VTF {
public double val;
public int ts;
public double fval;
public VTF(double v, int t, double f){
val = v;
ts = t;
fval = f;
}
public VTF extend(VT vt, double f){
return new VTF(vt.val, vt.ts, f);
}
public String toString() {
return "{"+val+","+ts+","+fval+"}";
}
}
The following program presents the StreamQL implementation of smooth
:
xxxxxxxxxx
Q<VT, VTF> smooth = QL.sWindow5(
(v, w, x, y, z) -> VTF.extend(x, 0.1*v.val + 0.2*w.val + 0.4*x.val + 0.2*y.val + 0.1*z.val)
);
The idea is that for a sample x
at time x.ts
we consider the window (v
, w
, x
, y
, z
) centered around x
and calculate a weighted average over the window for the smoothed value.
The differentiation query derivQ
of type Q<VTF,VTFD>
calculates discrete derivatives by taking the difference of successive smoothed values. It is implemented as:
xxxxxxxxxx
Q<VTF, VTFD> deriv = QL.sWindow2((x, y) -> VTFD.extend(y, y.fval - x.fval));
The record type VTFD
extends VTF
with an additional component dval
for storing the derivative:
xxxxxxxxxx
public class VTFD {
public double val;
public int ts;
public double fval;
public double dval;
public VTFD(double v, int t, double f, double d){
val = v;
ts = t;
fval = f;
dval = d;
}
static public VTFD extend(VTF vtf, double d){
return new VTFD(vtf.val, vtf.ts, vtf.fval, d);
}
public String toString() {
return "{"+val+","+ts+","+fval+","+dval+"}";
}
}
Then, in the detecting step, the detection of the first peak involves searching for the first time point when dval
exceeds the threshold hThred
. The signal interval from this point until the time point when dval
falls below the threshold lThred
contains the first peak. The signal in the interval is streamed to the argmax
query (see below), which finds the data item with the highest value (in the raw, unfiltered signal). This process is repeated indefinitely in order to detect all peaks:
xxxxxxxxxx
Q<VTFD,VTFD> start = QL.search(x -> x.dval > hThred);
Q<VTFD,VTFD> take = QL.takeUntil(x -> x.dval < lThred);
Q<VTFD,VTFD> argmax = QL.reduce((x, y) -> (y.val > x.val) ? y : x);
Q<VTFD,VTFD> detect = QL.iterate(Q.pipeline(Q.seq(start, take), argmax));
Finally, we present the complete StreamQL program for this case study:
x
SigGen src = new SigGen();
// create the sink for detected peaks
Consumer<VTFD> sink = ...
// initialize the thresholds
final double hThred = ...;
final double lThred = ...;
// smooth the signal to eliminate high-frequency noise
Q<VT, VTF> smooth = QL.sWindow5(
(v, w, x, y, z) -> VTF.extend(x, 0.1*v.val + 0.2*w.val + 0.4*x.val + 0.2*y.val + 0.1*z.val)
);
// take the derivative of the smoothed signal to calculate the slope
Q<VTF, VTFD> deriv = QL.sWindow2((x, y) -> VTFD.extend(y, y.fval - x.fval));
// finding the peaks
Q<VTFD, VTFD> start = QL.search(x -> x.dval > hThred);
Q<VTFD, VTFD> take = QL.takeUntil(x -> x.dval < lThred);
Q<VTFD, VTFD> argmax = QL.reduce((x, y) -> (y.val > x.val) ? y : x);
Q<VTFD, VTFD> getOnePeak = QL.pipeline(QL.seq(start, take), argmax);
Q<VTFD, VTFD> detect = QL.iterate(getOnePeak);
// compose the 3-step pipeline
Q<VT, VTFD> findPeak = QL.pipeline(smooth, deriv, detect);
// execution
Algo<VT, VTFD> algo = findPeak.eval();
algo.init();
algo.connect(sink);
VT vt = src.getVT();
do {
algo.next(vt);
vt = src.getVT();
} while (vt != null);
algo.end();
This tutorial uses several example to show how to program a streaming computation using the Java library of StreamQL. However, it does not cover all streaming constructs in the StreamQL library. More details can be found in the API documentation of StreamQL.