package streamql.algo.join;

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import streamql.algo.Algo;
import streamql.algo.Sink;
import utils.lambda.Func1;
import utils.lambda.Func2;
import utils.structures.Or;
import utils.structures.Punct;
import utils.structures.TTLed;
import utils.structures.Timed;

/* loaded from: input_file:streamql/algo/join/AlgoEquiJoin.class */
public class AlgoEquiJoin<A, B, C, K> extends Algo<Timed<Or<A, B>>, Timed<C>> {
    private final Func1<A, K> getKey1;
    private final Func1<B, K> getKey2;
    private final Func2<A, B, C> op;
    private Hashtable<K, Hashtable<TTLed<A>, Integer>> leftBuf;
    private Hashtable<K, Hashtable<TTLed<B>, Integer>> rightBuf;
    private List<TTLed<A>> leftQueue;
    private List<TTLed<B>> rightQueue;
    private long currentTime;
    private Sink<Timed<C>> sink;
    static final /* synthetic */ boolean $assertionsDisabled;

    public AlgoEquiJoin(Func1<A, K> func1, Func1<B, K> func12, Func2<A, B, C> func2) {
        this.getKey1 = func1;
        this.getKey2 = func12;
        this.op = func2;
    }

    @Override // streamql.algo.Algo
    public void connect(Sink<Timed<C>> sink) {
        this.sink = sink;
    }

    @Override // streamql.algo.Algo
    public void init() {
        this.leftBuf = new Hashtable<>();
        this.rightBuf = new Hashtable<>();
        this.leftQueue = new ArrayList();
        this.rightQueue = new ArrayList();
        this.currentTime = 0L;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // streamql.algo.Sink
    public void next(Timed<Or<A, B>> timed) {
        if (timed instanceof Punct) {
            this.currentTime += ((Punct) timed).getDt();
            int size = this.leftQueue.size();
            int i = 0;
            while (i < size) {
                TTLed<A> tTLed = this.leftQueue.get(i);
                if (tTLed.getTtl() <= this.currentTime) {
                    K call = this.getKey1.call(tTLed.getVal());
                    if (!$assertionsDisabled && !this.leftBuf.containsKey(call)) {
                        throw new AssertionError();
                    }
                    Hashtable<TTLed<A>, Integer> hashtable = this.leftBuf.get(call);
                    if (!$assertionsDisabled && !hashtable.containsKey(tTLed)) {
                        throw new AssertionError();
                    }
                    int intValue = hashtable.get(tTLed).intValue();
                    if (intValue == 1) {
                        hashtable.remove(tTLed);
                    } else {
                        hashtable.put(tTLed, Integer.valueOf(intValue - 1));
                    }
                    this.leftQueue.remove(i);
                    size--;
                } else {
                    i++;
                }
            }
            int size2 = this.rightQueue.size();
            int i2 = 0;
            while (i2 < size2) {
                TTLed<B> tTLed2 = this.rightQueue.get(i2);
                if (tTLed2.getTtl() <= this.currentTime) {
                    K call2 = this.getKey2.call(tTLed2.getVal());
                    if (!$assertionsDisabled && !this.rightBuf.containsKey(call2)) {
                        throw new AssertionError();
                    }
                    Hashtable<TTLed<B>, Integer> hashtable2 = this.rightBuf.get(call2);
                    if (!$assertionsDisabled && !hashtable2.containsKey(tTLed2)) {
                        throw new AssertionError();
                    }
                    int intValue2 = hashtable2.get(tTLed2).intValue();
                    if (intValue2 == 1) {
                        hashtable2.remove(tTLed2);
                    } else {
                        hashtable2.put(tTLed2, Integer.valueOf(intValue2 - 1));
                    }
                    this.rightQueue.remove(i2);
                    size2--;
                } else {
                    i2++;
                }
            }
            this.sink.next((Punct) timed);
            return;
        }
        if (!$assertionsDisabled && !(timed instanceof TTLed)) {
            throw new AssertionError();
        }
        long ttl = ((TTLed) timed).getTtl();
        if (ttl <= 0) {
            System.out.println("Warning, Join: the ttl of item: " + timed + " is set to be <= 0: " + ttl);
            return;
        }
        Or or = (Or) ((TTLed) timed).getVal();
        if (or.isLeft()) {
            TTLed<A> tTLed3 = new TTLed<>(or.getLeft(), this.currentTime + ttl);
            this.leftQueue.add(tTLed3);
            Object call3 = this.getKey1.call(or.getLeft());
            if (this.leftBuf.containsKey(call3)) {
                Hashtable<TTLed<A>, Integer> hashtable3 = this.leftBuf.get(call3);
                if (hashtable3.containsKey(tTLed3)) {
                    hashtable3.put(tTLed3, Integer.valueOf(hashtable3.get(tTLed3).intValue() + 1));
                } else {
                    hashtable3.put(tTLed3, 1);
                }
            } else {
                Hashtable hashtable4 = new Hashtable();
                hashtable4.put(tTLed3, 1);
                this.leftBuf.put(call3, hashtable4);
            }
            if (this.rightBuf.containsKey(call3)) {
                Hashtable<TTLed<B>, Integer> hashtable5 = this.rightBuf.get(call3);
                for (TTLed<B> tTLed4 : hashtable5.keySet()) {
                    int intValue3 = hashtable5.get(tTLed4).intValue();
                    if (!$assertionsDisabled && intValue3 <= 0) {
                        throw new AssertionError();
                    }
                    long min = Long.min(ttl, tTLed4.getTtl() - this.currentTime);
                    C call4 = this.op.call(tTLed3.getVal(), tTLed4.getVal());
                    for (int i3 = 0; i3 < intValue3; i3++) {
                        this.sink.next(new TTLed(call4, min));
                    }
                }
                return;
            }
            return;
        }
        TTLed<B> tTLed5 = new TTLed<>(or.getRight(), this.currentTime + ttl);
        this.rightQueue.add(tTLed5);
        Object call5 = this.getKey2.call(or.getRight());
        if (this.rightBuf.containsKey(call5)) {
            Hashtable<TTLed<B>, Integer> hashtable6 = this.rightBuf.get(call5);
            if (hashtable6.containsKey(tTLed5)) {
                hashtable6.put(tTLed5, Integer.valueOf(hashtable6.get(tTLed5).intValue() + 1));
            } else {
                hashtable6.put(tTLed5, 1);
            }
        } else {
            Hashtable hashtable7 = new Hashtable();
            hashtable7.put(tTLed5, 1);
            this.rightBuf.put(call5, hashtable7);
        }
        if (this.leftBuf.containsKey(call5)) {
            Hashtable<TTLed<A>, Integer> hashtable8 = this.leftBuf.get(call5);
            for (TTLed<A> tTLed6 : hashtable8.keySet()) {
                int intValue4 = hashtable8.get(tTLed6).intValue();
                if (!$assertionsDisabled && intValue4 <= 0) {
                    throw new AssertionError();
                }
                long min2 = Long.min(tTLed6.getTtl() - this.currentTime, ttl);
                C call6 = this.op.call(tTLed6.getVal(), tTLed5.getVal());
                for (int i4 = 0; i4 < intValue4; i4++) {
                    this.sink.next(new TTLed(call6, min2));
                }
            }
        }
    }

    @Override // streamql.algo.Sink
    public void end() {
        this.sink.end();
    }

    static {
        $assertionsDisabled = !AlgoEquiJoin.class.desiredAssertionStatus();
    }
}
