package streamql.algo;

/* loaded from: input_file:streamql/algo/AlgoTumblingWindow.class */
public class AlgoTumblingWindow<A, B> extends Algo<A, B> {
    private final int size;
    private final Algo<A, B> subQuery;
    private Sink<B> sink;
    private Sink<B> intermediate;
    private int w;

    public AlgoTumblingWindow(int i, Algo<A, B> algo) {
        this.size = i;
        this.subQuery = algo;
    }

    @Override // streamql.algo.Algo
    public void connect(final Sink<B> sink) {
        this.sink = sink;
        this.intermediate = new Sink<B>() { // from class: streamql.algo.AlgoTumblingWindow.1
            @Override // streamql.algo.Sink
            public void next(B b) {
                sink.next(b);
            }

            @Override // streamql.algo.Sink
            public void end() {
            }
        };
    }

    @Override // streamql.algo.Algo
    public void init() {
        this.w = 0;
    }

    @Override // streamql.algo.Sink
    public void next(A a) {
        if (this.w == 0) {
            this.subQuery.connect(this.intermediate);
            this.subQuery.init();
        }
        this.subQuery.next(a);
        this.w = (this.w + 1) % this.size;
        if (this.w == 0) {
            this.subQuery.end();
        }
    }

    @Override // streamql.algo.Sink
    public void end() {
        this.sink.end();
    }
}
