package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;

import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.EdgeProperty;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.class */
public class ParallelismSetter extends TezOpPlanVisitor {
    private static final Log LOG = LogFactory.getLog(ParallelismSetter.class);
    private Configuration conf;
    private PigContext pc;
    private TezParallelismEstimator estimator;
    private boolean autoParallelismEnabled;
    private int estimatedTotalParallelism;

    public ParallelismSetter(TezOperPlan tezOperPlan, PigContext pigContext) {
        super(tezOperPlan, new DependencyOrderWalker(tezOperPlan));
        this.estimatedTotalParallelism = 0;
        this.pc = pigContext;
        this.conf = ConfigurationUtil.toConfiguration(this.pc.getProperties());
        this.autoParallelismEnabled = this.conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true);
        try {
            this.estimator = this.conf.get(PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR) == null ? new TezOperDependencyParallelismEstimator() : (TezParallelismEstimator) PigContext.instantiateObjectFromParams(this.conf, PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR, PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY, TezParallelismEstimator.class);
            this.estimator.setPigContext(this.pc);
        } catch (ExecException e) {
            throw new RuntimeException("Error instantiating TezParallelismEstimator", e);
        }
    }

    public int getEstimatedTotalParallelism() {
        return this.estimatedTotalParallelism;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor
    public void visitTezOp(TezOperator tezOperator) throws VisitorException {
        if (tezOperator instanceof NativeTezOper) {
            return;
        }
        try {
            int i = -1;
            if (tezOperator.getLoaderInfo().getLoads() != null && tezOperator.getLoaderInfo().getLoads().size() > 0) {
                tezOperator.setVertexParallelism(tezOperator.getRequestedParallelism());
                return;
            }
            int i2 = -1;
            boolean z = false;
            for (Map.Entry<OperatorKey, TezEdgeDescriptor> entry : tezOperator.inEdges.entrySet()) {
                if (entry.getValue().dataMovementType == EdgeProperty.DataMovementType.ONE_TO_ONE) {
                    TezOperator operator = ((TezOperPlan) this.mPlan).getOperator(entry.getKey());
                    int effectiveParallelism = operator.getEffectiveParallelism(this.pc.defaultParallel);
                    if (i2 == -1) {
                        i2 = effectiveParallelism;
                    } else if (i2 != effectiveParallelism) {
                        throw new VisitorException("one to one sources parallelism for vertex " + tezOperator.getOperatorKey().toString() + " are not equal");
                    }
                    tezOperator.setRequestedParallelism(operator.getRequestedParallelism());
                    if (tezOperator.getEstimatedParallelism() == -1) {
                        tezOperator.setEstimatedParallelism(operator.getEstimatedParallelism());
                    }
                    z = true;
                    incrementTotalParallelism(tezOperator, effectiveParallelism);
                    i = -1;
                }
            }
            if (!z) {
                if (tezOperator.getRequestedParallelism() != -1) {
                    i = tezOperator.getRequestedParallelism();
                } else if (this.pc.defaultParallel != -1) {
                    i = this.pc.defaultParallel;
                }
                boolean z2 = false;
                if (i != -1 && this.autoParallelismEnabled && tezOperator.isIntermediateReducer().booleanValue() && !tezOperator.isDontEstimateParallelism() && tezOperator.isOverrideIntermediateParallelism()) {
                    z2 = true;
                }
                if (i == -1 || z2) {
                    if (tezOperator.getEstimatedParallelism() == -1) {
                        i = this.estimator.estimateParallelism((TezOperPlan) this.mPlan, tezOperator, this.conf);
                        if (z2) {
                            if (tezOperator.getRequestedParallelism() != i) {
                                LOG.info("Increased requested parallelism of " + tezOperator.getOperatorKey() + " to " + i);
                            }
                            tezOperator.setRequestedParallelism(i);
                        } else {
                            tezOperator.setEstimatedParallelism(i);
                        }
                    } else {
                        i = tezOperator.getEstimatedParallelism();
                    }
                    if (tezOperator.isGlobalSort() || tezOperator.isSkewedJoin()) {
                        boolean z3 = false;
                        if ((tezOperator.isGlobalSort() && getPlan().getPredecessors(tezOperator).size() != 1) || (tezOperator.isSkewedJoin() && getPlan().getPredecessors(tezOperator).size() != 2)) {
                            z3 = true;
                        }
                        if (z2 || z3) {
                            Iterator<TezOperator> it = ((TezOperPlan) this.mPlan).getPredecessors(tezOperator).iterator();
                            while (true) {
                                if (!it.hasNext()) {
                                    break;
                                }
                                TezOperator next = it.next();
                                if (next.isSampleBasedPartitioner()) {
                                    Iterator<TezOperator> it2 = ((TezOperPlan) this.mPlan).getPredecessors(next).iterator();
                                    while (true) {
                                        if (!it2.hasNext()) {
                                            break;
                                        }
                                        TezOperator next2 = it2.next();
                                        if (next2.isSampleAggregation() && next2.plan != null) {
                                            LOG.debug("Updating parallelism constant value to " + i + " in " + next2.plan);
                                            new ParallelConstantVisitor(next2.plan, i).visit();
                                            next2.setNeedEstimatedQuantile(false);
                                            break;
                                        }
                                    }
                                }
                            }
                        } else {
                            incrementTotalParallelism(tezOperator, i);
                            i = -1;
                        }
                    }
                }
            }
            incrementTotalParallelism(tezOperator, i);
            tezOperator.setVertexParallelism(i);
            if (tezOperator.getCrossKeys() != null) {
                Iterator<String> it3 = tezOperator.getCrossKeys().iterator();
                while (it3.hasNext()) {
                    this.pc.getProperties().put("pig.cross.parallelism." + it3.next(), Integer.toString(tezOperator.getVertexParallelism()));
                }
            }
        } catch (Exception e) {
            throw new VisitorException(e);
        }
    }

    private void incrementTotalParallelism(TezOperator tezOperator, int i) {
        if (tezOperator.isVertexGroup() || i == -1) {
            return;
        }
        this.estimatedTotalParallelism += i;
    }
}
