package streamql.algo.temporal;

import streamql.algo.Algo;
import streamql.algo.Sink;
import utils.lambda.Func1;

/* loaded from: input_file:streamql/algo/temporal/AlgoSkipByTime.class */
public class AlgoSkipByTime<A> extends Algo<A, A> {
    private final long period;
    private final Func1<A, Long> getTime;
    private Sink<A> sink;
    private long startT;
    private long currentT;
    private boolean isFirst;
    private boolean allSkipped;
    static final /* synthetic */ boolean $assertionsDisabled;

    public AlgoSkipByTime(long j, Func1<A, Long> func1) {
        this.period = j;
        this.getTime = func1;
    }

    @Override // streamql.algo.Algo
    public void connect(Sink<A> sink) {
        this.sink = sink;
    }

    @Override // streamql.algo.Algo
    public void init() {
        this.isFirst = true;
        this.allSkipped = false;
    }

    @Override // streamql.algo.Sink
    public void next(A a) {
        if (this.allSkipped) {
            this.sink.next(a);
            return;
        }
        if (this.isFirst) {
            long longValue = this.getTime.call(a).longValue();
            this.startT = longValue;
            this.currentT = longValue;
            this.isFirst = false;
            return;
        }
        if (!$assertionsDisabled && this.getTime.call(a).longValue() < this.currentT) {
            throw new AssertionError();
        }
        this.currentT = this.getTime.call(a).longValue();
        if (this.currentT - this.startT >= this.period) {
            this.allSkipped = true;
            this.sink.next(a);
        }
    }

    @Override // streamql.algo.Sink
    public void end() {
        if (this.allSkipped) {
            this.sink.end();
        }
    }

    static {
        $assertionsDisabled = !AlgoSkipByTime.class.desiredAssertionStatus();
    }
}
