package org.apache.pig.backend.hadoop.executionengine.tez.runtime;

import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.pig.JVMReuseImpl;
import org.apache.pig.PigConstants;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.apache.tez.common.TezUtils;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.KeyValueReader;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.class */
public class PigProcessor extends AbstractLogicalIOProcessor {
    private static final Log LOG = LogFactory.getLog(PigProcessor.class);
    public static final String PLAN = "pig.exec.tez.plan";
    public static final String COMBINE_PLAN = "pig.exec.tez.combine.plan";
    public static final String ESTIMATE_PARALLELISM = "pig.exec.estimate.parallelism";
    public static final String ESTIMATED_NUM_PARALLELISM = "pig.exec.estimated.num.parallelism";
    public static final String SAMPLE_VERTEX = "pig.sampleVertex";
    public static final String SORT_VERTEX = "pig.sortVertex";
    private PhysicalPlan execPlan;
    private Set<MROutput> fileOutputs;
    private PhysicalOperator leaf;
    private Configuration conf;
    private PigHadoopLogger pigHadoopLogger;
    public static String sampleVertex;
    public static Map<String, Object> sampleMap;

    public PigProcessor(ProcessorContext processorContext) {
        super(processorContext);
        this.fileOutputs = new HashSet();
        ObjectCache.getInstance().setObjectRegistry(processorContext.getObjectRegistry());
    }

    public void initialize() throws Exception {
        sampleVertex = null;
        sampleMap = null;
        new JVMReuseImpl().cleanupStaticData();
        this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
        PigContext.setPackageImportList((ArrayList) ObjectSerializer.deserialize(this.conf.get("udf.import.list")));
        PigContext pigContext = (PigContext) ObjectSerializer.deserialize(this.conf.get("pig.pigContext"));
        this.conf.set("mapreduce.job.application.attempt.id", getContext().getUniqueIdentifier());
        this.conf.set(PigConstants.TASK_INDEX, Integer.toString(getContext().getTaskIndex()));
        UDFContext.getUDFContext().addJobConf(this.conf);
        UDFContext.getUDFContext().deserialize();
        this.execPlan = (PhysicalPlan) ObjectSerializer.deserialize(this.conf.get(PLAN));
        SchemaTupleBackend.initialize(this.conf, pigContext);
        PigMapReduce.sJobContext = HadoopShims.createJobContext(this.conf, new JobID());
        PigMapReduce.sJobConfInternal.set(this.conf);
        Utils.setDefaultTimeZone(this.conf);
        boolean equalsIgnoreCase = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
        PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
        pigStatusReporter.setContext(new TezTaskContext(getContext()));
        this.pigHadoopLogger = PigHadoopLogger.getInstance();
        this.pigHadoopLogger.setReporter(pigStatusReporter);
        this.pigHadoopLogger.setAggregate(equalsIgnoreCase);
        PhysicalOperator.setPigLogger(this.pigHadoopLogger);
        Iterator it = PlanHelper.getPhysicalOperators(this.execPlan, TezTaskConfigurable.class).iterator();
        while (it.hasNext()) {
            ((TezTaskConfigurable) it.next()).initialize(getContext());
        }
    }

    public void handleEvents(List<Event> list) {
    }

    public void close() throws Exception {
        this.execPlan = null;
        this.fileOutputs = null;
        this.leaf = null;
        this.conf = null;
        sampleMap = null;
        sampleVertex = null;
        this.pigHadoopLogger = null;
        new JVMReuseImpl().cleanupStaticData();
    }

    public void run(Map<String, LogicalInput> map, Map<String, LogicalOutput> map2) throws Exception {
        try {
            initializeInputs(map);
            initializeOutputs(map2);
            if (!this.execPlan.isEmpty()) {
                this.leaf = this.execPlan.getLeaves().get(0);
            }
            runPipeline(this.leaf);
            if (Boolean.valueOf(this.conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false")).booleanValue() && !this.execPlan.endOfAllInput) {
                this.execPlan.endOfAllInput = true;
                runPipeline(this.leaf);
            }
            try {
                new UDFFinishVisitor(this.execPlan, new DependencyOrderWalker(this.execPlan)).visit();
                if (!this.fileOutputs.isEmpty()) {
                    while (!getContext().canCommit()) {
                        Thread.sleep(100L);
                    }
                    for (MROutput mROutput : this.fileOutputs) {
                        mROutput.flush();
                        if (mROutput.isCommitRequired()) {
                            mROutput.commit();
                        }
                    }
                }
                if (this.conf.getBoolean(ESTIMATE_PARALLELISM, false)) {
                    int i = 1;
                    if (sampleMap != null && sampleMap.containsKey(ESTIMATED_NUM_PARALLELISM)) {
                        i = ((Integer) sampleMap.get(ESTIMATED_NUM_PARALLELISM)).intValue();
                    }
                    String str = this.conf.get(SORT_VERTEX);
                    LOG.info("Sending numParallelism " + i + " to " + str);
                    VertexManagerEvent create = VertexManagerEvent.create(str, ByteBuffer.wrap(Ints.toByteArray(i)));
                    ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(1);
                    newArrayListWithCapacity.add(create);
                    getContext().sendEvents(newArrayListWithCapacity);
                }
            } catch (VisitorException e) {
                throw new VisitorException("Error while calling finish method on UDFs.", 2121, (byte) 4, e);
            }
        } catch (Exception e2) {
            LOG.error("Encountered exception while processing: ", e2);
            abortOutput();
            throw e2;
        }
    }

    private void abortOutput() {
        Iterator<MROutput> it = this.fileOutputs.iterator();
        while (it.hasNext()) {
            try {
                it.next().abort();
            } catch (Exception e) {
                LOG.error("Encountered exception while aborting output", e);
            }
        }
    }

    private void initializeInputs(Map<String, LogicalInput> map) throws Exception {
        HashSet hashSet = new HashSet();
        sampleVertex = this.conf.get(SAMPLE_VERTEX);
        if (sampleVertex != null) {
            collectSample(sampleVertex, map.get(sampleVertex));
            hashSet.add(sampleVertex);
        }
        LinkedList physicalOperators = PlanHelper.getPhysicalOperators(this.execPlan, TezInput.class);
        Iterator it = physicalOperators.iterator();
        while (it.hasNext()) {
            ((TezInput) it.next()).addInputsToSkip(hashSet);
        }
        LinkedList linkedList = new LinkedList();
        Iterator it2 = PlanHelper.getPhysicalOperators(this.execPlan, POUserFunc.class).iterator();
        while (it2.hasNext()) {
            POUserFunc pOUserFunc = (POUserFunc) it2.next();
            if (pOUserFunc.getFunc() instanceof ReadScalarsTez) {
                linkedList.add((ReadScalarsTez) pOUserFunc.getFunc());
            }
        }
        Iterator it3 = linkedList.iterator();
        while (it3.hasNext()) {
            ((ReadScalarsTez) it3.next()).addInputsToSkip(hashSet);
        }
        for (Map.Entry<String, LogicalInput> entry : map.entrySet()) {
            if (hashSet.contains(entry.getKey())) {
                LOG.info("Skipping fetch of input " + entry.getValue() + " from vertex " + entry.getKey());
            } else {
                LOG.info("Starting fetch of input " + entry.getValue() + " from vertex " + entry.getKey());
                entry.getValue().start();
            }
        }
        Iterator it4 = physicalOperators.iterator();
        while (it4.hasNext()) {
            ((TezInput) it4.next()).attachInputs(map, this.conf);
        }
        Iterator it5 = linkedList.iterator();
        while (it5.hasNext()) {
            ((ReadScalarsTez) it5.next()).attachInputs(map, this.conf);
        }
    }

    private void initializeOutputs(Map<String, LogicalOutput> map) throws Exception {
        for (Map.Entry<String, LogicalOutput> entry : map.entrySet()) {
            MROutput mROutput = (LogicalOutput) entry.getValue();
            LOG.info("Starting output " + mROutput + " to vertex " + entry.getKey());
            mROutput.start();
            if (mROutput instanceof MROutput) {
                this.fileOutputs.add(mROutput);
            }
        }
        Iterator it = PlanHelper.getPhysicalOperators(this.execPlan, TezOutput.class).iterator();
        while (it.hasNext()) {
            ((TezOutput) it.next()).attachOutputs(map, this.conf);
        }
    }

    protected void runPipeline(PhysicalOperator physicalOperator) throws IOException, InterruptedException {
        while (true) {
            Result nextTuple = physicalOperator.getNextTuple();
            if (nextTuple.returnStatus != 0) {
                if (nextTuple.returnStatus == 3) {
                    return;
                }
                if (nextTuple.returnStatus != 1 && nextTuple.returnStatus == 2) {
                    throw new ExecException(nextTuple.result != null ? "Received Error while processing the map plan: " + nextTuple.result : "Received Error while processing the map plan.", 2055, (byte) 4);
                }
            }
        }
    }

    private void collectSample(String str, LogicalInput logicalInput) throws Exception {
        String str2 = "sample-" + str + ".quantileMap";
        sampleMap = (Map) ObjectCache.getInstance().retrieve(str2);
        if (sampleMap != null) {
            return;
        }
        LOG.info("Starting fetch of input " + logicalInput + " from vertex " + str);
        logicalInput.start();
        KeyValueReader reader = logicalInput.getReader();
        reader.next();
        Object currentValue = reader.getCurrentValue();
        if (currentValue == null) {
            LOG.warn("Cannot fetch sample from " + str);
        } else {
            sampleMap = (Map) ((Tuple) currentValue).get(0);
            ObjectCache.getInstance().cache(str2, sampleMap);
        }
    }
}
