package org.apache.hive.druid.io.druid.segment.realtime;

import com.google.inject.Inject;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hive.druid.com.google.common.annotations.VisibleForTesting;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.com.metamx.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.concurrent.Execs;
import org.apache.hive.druid.io.druid.data.input.Committer;
import org.apache.hive.druid.io.druid.data.input.Firehose;
import org.apache.hive.druid.io.druid.data.input.FirehoseV2;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.io.Closer;
import org.apache.hive.druid.io.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.hive.druid.io.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.hive.druid.io.druid.query.FinalizeResultsQueryRunner;
import org.apache.hive.druid.io.druid.query.NoopQueryRunner;
import org.apache.hive.druid.io.druid.query.Query;
import org.apache.hive.druid.io.druid.query.QueryRunner;
import org.apache.hive.druid.io.druid.query.QueryRunnerFactory;
import org.apache.hive.druid.io.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.hive.druid.io.druid.query.QuerySegmentWalker;
import org.apache.hive.druid.io.druid.query.SegmentDescriptor;
import org.apache.hive.druid.io.druid.query.spec.SpecificSegmentSpec;
import org.apache.hive.druid.io.druid.segment.indexing.DataSchema;
import org.apache.hive.druid.io.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Committers;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Plumber;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Plumbers;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentServerAnnouncer;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/hive/druid/io/druid/segment/realtime/RealtimeManager.class */
public class RealtimeManager implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(RealtimeManager.class);
    private final List<FireDepartment> fireDepartments;
    private final QueryRunnerFactoryConglomerate conglomerate;
    private final DataSegmentServerAnnouncer serverAnnouncer;
    private final Map<String, Map<Integer, FireChief>> chiefs;
    private ExecutorService fireChiefExecutor;
    private boolean stopping;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hive/druid/io/druid/segment/realtime/RealtimeManager$FireChief.class */
    public class FireChief implements Runnable {
        private final FireDepartment fireDepartment;
        private final FireDepartmentMetrics metrics;
        private final RealtimeTuningConfig config;
        private final QueryRunnerFactoryConglomerate conglomerate;
        private Plumber plumber;

        FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate) {
            this.fireDepartment = fireDepartment;
            this.conglomerate = queryRunnerFactoryConglomerate;
            this.config = fireDepartment.getTuningConfig();
            this.metrics = fireDepartment.getMetrics();
        }

        private Firehose initFirehose() {
            try {
                RealtimeManager.log.info("Calling the FireDepartment and getting a Firehose.", new Object[0]);
                return this.fireDepartment.connect();
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }

        private FirehoseV2 initFirehoseV2(Object obj) {
            try {
                RealtimeManager.log.info("Calling the FireDepartment and getting a FirehoseV2.", new Object[0]);
                return this.fireDepartment.connect(obj);
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }

        private void initPlumber() {
            RealtimeManager.log.info("Someone get us a plumber!", new Object[0]);
            this.plumber = this.fireDepartment.findPlumber();
        }

        @VisibleForTesting
        Plumber getPlumber() {
            return this.plumber;
        }

        public FireDepartmentMetrics getMetrics() {
            return this.metrics;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            boolean runFirehose;
            initPlumber();
            try {
                Closer create = Closer.create();
                try {
                    try {
                        Object startJob = this.plumber.startJob();
                        if (this.fireDepartment.checkFirehoseV2()) {
                            FirehoseV2 initFirehoseV2 = initFirehoseV2(startJob);
                            create.register(initFirehoseV2);
                            runFirehose = runFirehoseV2(initFirehoseV2);
                        } else {
                            Firehose initFirehose = initFirehose();
                            create.register(initFirehose);
                            runFirehose = runFirehose(initFirehose);
                        }
                        if (runFirehose) {
                            create.register(() -> {
                                this.plumber.finishJob();
                            });
                        }
                        create.close();
                    } catch (Throwable th) {
                        create.close();
                        throw th;
                    }
                } catch (Error e) {
                    RealtimeManager.log.makeAlert(e, "Error aborted realtime processing[%s]", this.fireDepartment.getDataSchema().getDataSource()).emit();
                    throw create.rethrow(e);
                } catch (InterruptedException e2) {
                    RealtimeManager.log.warn("Interrupted while running a firehose", new Object[0]);
                    throw create.rethrow(e2);
                } catch (Exception e3) {
                    RealtimeManager.log.makeAlert(e3, "[%s] aborted realtime processing[%s]", e3.getClass().getSimpleName(), this.fireDepartment.getDataSchema().getDataSource()).emit();
                    throw create.rethrow(e3);
                }
            } catch (IOException e4) {
                throw Throwables.propagate(e4);
            }
        }

        private boolean runFirehoseV2(FirehoseV2 firehoseV2) throws Exception {
            firehoseV2.start();
            RealtimeManager.log.info("FirehoseV2 started", new Object[0]);
            Supplier<Committer> supplierFromFirehoseV2 = Committers.supplierFromFirehoseV2(firehoseV2);
            boolean z = true;
            while (z) {
                if (Thread.interrupted() || RealtimeManager.this.stopping) {
                    return false;
                }
                InputRow inputRow = null;
                try {
                    inputRow = firehoseV2.currRow();
                    if (inputRow == null) {
                        RealtimeManager.log.debug("thrown away null input row, considering unparseable", new Object[0]);
                        this.metrics.incrementUnparseable();
                    } else if (this.plumber.add(inputRow, supplierFromFirehoseV2) < 0) {
                        this.metrics.incrementThrownAway();
                        RealtimeManager.log.debug("Throwing away event[%s]", inputRow);
                    } else {
                        this.metrics.incrementProcessed();
                    }
                } catch (Exception e) {
                    RealtimeManager.log.makeAlert(e, "Unknown exception, Ignoring and continuing.", new Object[0]).addData("inputRow", inputRow);
                }
                try {
                    z = firehoseV2.advance();
                } catch (Exception e2) {
                    RealtimeManager.log.debug(e2, "exception in firehose.advance(), considering unparseable row", new Object[0]);
                    this.metrics.incrementUnparseable();
                }
            }
            return true;
        }

        private boolean runFirehose(Firehose firehose) {
            Supplier<Committer> supplierFromFirehose = Committers.supplierFromFirehose(firehose);
            while (firehose.hasMore()) {
                if (Thread.interrupted() || RealtimeManager.this.stopping) {
                    return false;
                }
                Plumbers.addNextRow(supplierFromFirehose, firehose, this.plumber, this.config.isReportParseExceptions(), this.metrics);
            }
            return true;
        }

        public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
            return new FinalizeResultsQueryRunner(this.plumber.getQueryRunner(query), this.conglomerate.findFactory(query).getToolchest());
        }
    }

    @Inject
    public RealtimeManager(List<FireDepartment> list, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, DataSegmentServerAnnouncer dataSegmentServerAnnouncer) {
        this(list, queryRunnerFactoryConglomerate, dataSegmentServerAnnouncer, Maps.newHashMap());
    }

    @VisibleForTesting
    RealtimeManager(List<FireDepartment> list, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, DataSegmentServerAnnouncer dataSegmentServerAnnouncer, Map<String, Map<Integer, FireChief>> map) {
        this.fireDepartments = list;
        this.conglomerate = queryRunnerFactoryConglomerate;
        this.serverAnnouncer = dataSegmentServerAnnouncer;
        this.chiefs = map == null ? Maps.newHashMap() : Maps.newHashMap(map);
    }

    @VisibleForTesting
    Map<Integer, FireChief> getFireChiefs(String str) {
        return this.chiefs.get(str);
    }

    @LifecycleStart
    public void start() throws IOException {
        this.serverAnnouncer.announce();
        this.fireChiefExecutor = Execs.multiThreaded(this.fireDepartments.size(), "chief-%d");
        for (FireDepartment fireDepartment : this.fireDepartments) {
            DataSchema dataSchema = fireDepartment.getDataSchema();
            FireChief fireChief = new FireChief(fireDepartment, this.conglomerate);
            this.chiefs.computeIfAbsent(dataSchema.getDataSource(), str -> {
                return new HashMap();
            }).put(Integer.valueOf(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum()), fireChief);
            this.fireChiefExecutor.submit(fireChief);
        }
    }

    @LifecycleStop
    public void stop() {
        this.stopping = true;
        try {
            if (this.fireChiefExecutor != null) {
                this.fireChiefExecutor.shutdownNow();
                Preconditions.checkState(this.fireChiefExecutor.awaitTermination(10L, TimeUnit.SECONDS), "persistExecutor not terminated");
            }
            this.serverAnnouncer.unannounce();
        } catch (InterruptedException e) {
            throw new ISE(e, "Failed to shutdown fireChiefExecutor during stop()", new Object[0]);
        }
    }

    public FireDepartmentMetrics getMetrics(String str) {
        Map<Integer, FireChief> map = this.chiefs.get(str);
        if (map == null) {
            return null;
        }
        FireDepartmentMetrics fireDepartmentMetrics = null;
        for (FireChief fireChief : map.values()) {
            if (fireDepartmentMetrics == null) {
                fireDepartmentMetrics = fireChief.getMetrics().snapshot();
            } else {
                fireDepartmentMetrics.merge(fireChief.getMetrics());
            }
        }
        return fireDepartmentMetrics;
    }

    @Override // org.apache.hive.druid.io.druid.query.QuerySegmentWalker
    public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> iterable) {
        QueryRunnerFactory findFactory = this.conglomerate.findFactory(query);
        Map<Integer, FireChief> map = this.chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames()));
        return map == null ? new NoopQueryRunner() : findFactory.getToolchest().mergeResults(findFactory.mergeRunners(MoreExecutors.sameThreadExecutor(), Iterables.transform(map.values(), new Function<FireChief, QueryRunner<T>>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.RealtimeManager.1
            @Override // org.apache.hive.druid.com.google.common.base.Function
            public QueryRunner<T> apply(FireChief fireChief) {
                return fireChief.getQueryRunner(query);
            }
        })));
    }

    @Override // org.apache.hive.druid.io.druid.query.QuerySegmentWalker
    public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> iterable) {
        QueryRunnerFactory findFactory = this.conglomerate.findFactory(query);
        final Map<Integer, FireChief> map = this.chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames()));
        return map == null ? new NoopQueryRunner() : findFactory.getToolchest().mergeResults(findFactory.mergeRunners(MoreExecutors.sameThreadExecutor(), Iterables.transform(iterable, new Function<SegmentDescriptor, QueryRunner<T>>() { // from class: org.apache.hive.druid.io.druid.segment.realtime.RealtimeManager.2
            @Override // org.apache.hive.druid.com.google.common.base.Function
            public QueryRunner<T> apply(SegmentDescriptor segmentDescriptor) {
                FireChief fireChief = (FireChief) map.get(Integer.valueOf(segmentDescriptor.getPartitionNumber()));
                return fireChief == null ? new NoopQueryRunner() : fireChief.getQueryRunner(query.withQuerySegmentSpec(new SpecificSegmentSpec(segmentDescriptor)));
            }
        })));
    }
}
