package streamql.algo;

import java.util.LinkedHashMap;
import java.util.Map;
import streamql.query.Q;
import utils.lambda.Func1;
import utils.lambda.Func2;

/* loaded from: input_file:streamql/algo/AlgoGroupByKey.class */
public class AlgoGroupByKey<A, B, C, K> extends Algo<A, C> {
    private final Func1<A, K> getKey;
    private final Q<A, B> query;
    private final Func2<K, B, C> combine;
    private LinkedHashMap<K, Algo<A, B>> map;
    private Sink<C> sink;
    private Sink<B> intermediate;
    private K currentKey;

    public AlgoGroupByKey(Func1<A, K> func1, Q<A, B> q, Func2<K, B, C> func2) {
        this.getKey = func1;
        this.query = q;
        this.combine = func2;
    }

    @Override // streamql.algo.Algo
    public void connect(final Sink<C> sink) {
        this.sink = sink;
        this.intermediate = new Sink<B>() { // from class: streamql.algo.AlgoGroupByKey.1
            @Override // streamql.algo.Sink
            public void next(B b) {
                sink.next(AlgoGroupByKey.this.combine.call(AlgoGroupByKey.this.currentKey, b));
            }

            @Override // streamql.algo.Sink
            public void end() {
            }
        };
    }

    @Override // streamql.algo.Algo
    public void init() {
        this.map = new LinkedHashMap<>();
        this.currentKey = null;
    }

    @Override // streamql.algo.Sink
    public void next(A a) {
        this.currentKey = this.getKey.call(a);
        Algo<A, B> algo = this.map.get(this.currentKey);
        if (algo == null) {
            algo = this.query.eval();
            this.map.put(this.currentKey, algo);
            algo.connect(this.intermediate);
            algo.init();
        }
        algo.next(a);
    }

    @Override // streamql.algo.Sink
    public void end() {
        for (Map.Entry<K, Algo<A, B>> entry : this.map.entrySet()) {
            this.currentKey = entry.getKey();
            entry.getValue().end();
        }
        this.sink.end();
    }
}
