package streamql.algo.join;

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import streamql.algo.Algo;
import streamql.algo.Sink;
import utils.lambda.Func2;
import utils.structures.Or;
import utils.structures.Punct;
import utils.structures.TTLed;
import utils.structures.Timed;

/* loaded from: input_file:streamql/algo/join/AlgoJoin.class */
public class AlgoJoin<A, B, C> extends Algo<Timed<Or<A, B>>, Timed<C>> {
    private final Func2<A, B, C> op;
    private Hashtable<TTLed<A>, Integer> leftBuf;
    private Hashtable<TTLed<B>, Integer> rightBuf;
    private List<TTLed<A>> leftQueue;
    private List<TTLed<B>> rightQueue;
    private long currentTime;
    private Sink<Timed<C>> sink;
    static final /* synthetic */ boolean $assertionsDisabled;

    public AlgoJoin(Func2<A, B, C> func2) {
        this.op = func2;
    }

    @Override // streamql.algo.Algo
    public void connect(Sink<Timed<C>> sink) {
        this.sink = sink;
    }

    @Override // streamql.algo.Algo
    public void init() {
        this.leftBuf = new Hashtable<>();
        this.rightBuf = new Hashtable<>();
        this.leftQueue = new ArrayList();
        this.rightQueue = new ArrayList();
        this.currentTime = 0L;
    }

    @Override // streamql.algo.Sink
    public void next(Timed<Or<A, B>> timed) {
        if (timed instanceof Punct) {
            this.currentTime += ((Punct) timed).getDt();
            int size = this.leftQueue.size();
            int i = 0;
            while (i < size) {
                TTLed<A> tTLed = this.leftQueue.get(i);
                if (tTLed.getTtl() > this.currentTime) {
                    i++;
                } else {
                    if (!$assertionsDisabled && !this.leftBuf.containsKey(tTLed)) {
                        throw new AssertionError();
                    }
                    int intValue = this.leftBuf.get(tTLed).intValue();
                    if (intValue == 1) {
                        this.leftBuf.remove(tTLed);
                    } else {
                        this.leftBuf.put(tTLed, Integer.valueOf(intValue - 1));
                    }
                    this.leftQueue.remove(i);
                    size--;
                }
            }
            int size2 = this.rightQueue.size();
            int i2 = 0;
            while (i2 < size2) {
                TTLed<B> tTLed2 = this.rightQueue.get(i2);
                if (tTLed2.getTtl() > this.currentTime) {
                    i2++;
                } else {
                    if (!$assertionsDisabled && !this.rightBuf.containsKey(tTLed2)) {
                        throw new AssertionError();
                    }
                    int intValue2 = this.rightBuf.get(tTLed2).intValue();
                    if (intValue2 == 1) {
                        this.rightBuf.remove(tTLed2);
                    } else {
                        this.rightBuf.put(tTLed2, Integer.valueOf(intValue2 - 1));
                    }
                    this.rightQueue.remove(i2);
                    size2--;
                }
            }
            this.sink.next((Punct) timed);
            return;
        }
        if (!$assertionsDisabled && !(timed instanceof TTLed)) {
            throw new AssertionError();
        }
        long ttl = ((TTLed) timed).getTtl();
        if (ttl <= 0) {
            System.out.println("Warning Join: the ttl of item: " + timed + " is set to be <= 0: " + ttl);
            return;
        }
        Or or = (Or) ((TTLed) timed).getVal();
        if (or.isLeft()) {
            TTLed<A> tTLed3 = new TTLed<>(or.getLeft(), this.currentTime + ttl);
            this.leftQueue.add(tTLed3);
            if (this.leftBuf.containsKey(tTLed3)) {
                this.leftBuf.put(tTLed3, Integer.valueOf(this.leftBuf.get(tTLed3).intValue() + 1));
            } else {
                this.leftBuf.put(tTLed3, 1);
            }
            for (TTLed<B> tTLed4 : this.rightBuf.keySet()) {
                int intValue3 = this.rightBuf.get(tTLed4).intValue();
                if (!$assertionsDisabled && intValue3 <= 0) {
                    throw new AssertionError();
                }
                long min = Long.min(ttl, tTLed4.getTtl() - this.currentTime);
                C call = this.op.call(tTLed3.getVal(), tTLed4.getVal());
                for (int i3 = 0; i3 < intValue3; i3++) {
                    this.sink.next(new TTLed(call, min));
                }
            }
            return;
        }
        TTLed<B> tTLed5 = new TTLed<>(or.getRight(), this.currentTime + ttl);
        this.rightQueue.add(tTLed5);
        if (this.rightBuf.containsKey(tTLed5)) {
            this.rightBuf.put(tTLed5, Integer.valueOf(this.rightBuf.get(tTLed5).intValue() + 1));
        } else {
            this.rightBuf.put(tTLed5, 1);
        }
        for (TTLed<A> tTLed6 : this.leftBuf.keySet()) {
            int intValue4 = this.leftBuf.get(tTLed6).intValue();
            if (!$assertionsDisabled && intValue4 <= 0) {
                throw new AssertionError();
            }
            long min2 = Long.min(tTLed6.getTtl() - this.currentTime, ttl);
            C call2 = this.op.call(tTLed6.getVal(), tTLed5.getVal());
            for (int i4 = 0; i4 < intValue4; i4++) {
                this.sink.next(new TTLed(call2, min2));
            }
        }
    }

    @Override // streamql.algo.Sink
    public void end() {
        this.sink.end();
    }

    static {
        $assertionsDisabled = !AlgoJoin.class.desiredAssertionStatus();
    }
}
