package org.apache.pig.backend.hadoop.executionengine.tez.runtime;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.class */
public class PigGraceShuffleVertexManager extends ShuffleVertexManager {
    private TezOperPlan tezPlan;
    private List<String> grandParents;
    private List<String> finishedGrandParents;
    private long bytesPerTask;
    private Configuration conf;
    private PigContext pc;
    private int thisParallelism;
    private boolean parallelismSet;
    private static final Log LOG = LogFactory.getLog(PigGraceShuffleVertexManager.class);

    public PigGraceShuffleVertexManager(VertexManagerPluginContext vertexManagerPluginContext) {
        super(vertexManagerPluginContext);
        this.grandParents = new ArrayList();
        this.finishedGrandParents = new ArrayList();
        this.thisParallelism = -1;
        this.parallelismSet = false;
    }

    public synchronized void initialize() {
        try {
            this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
            this.bytesPerTask = this.conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, 1000000000L);
            this.pc = (PigContext) ObjectSerializer.deserialize(this.conf.get("pig.pigContext"));
            this.tezPlan = (TezOperPlan) ObjectSerializer.deserialize(this.conf.get("pig.tez.plan"));
            try {
                new TezEstimatedParallelismClearer(this.tezPlan).visit();
                TezOperator operator = this.tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
                this.grandParents = Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(this.tezPlan, operator), new Function<TezOperator, String>() { // from class: org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager.1
                    @Override // com.google.common.base.Function
                    public String apply(TezOperator tezOperator) {
                        return tezOperator.getOperatorKey().toString();
                    }
                });
                Iterator<String> it = this.grandParents.iterator();
                while (it.hasNext()) {
                    getContext().registerForVertexStateUpdates(it.next(), EnumSet.of(VertexState.SUCCEEDED));
                }
                super.initialize();
            } catch (VisitorException e) {
                throw new TezUncheckedException(e);
            }
        } catch (IOException e2) {
            throw new TezUncheckedException(e2);
        }
    }

    public synchronized void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
        List<TezOperator> predecessors;
        super.onVertexStateUpdated(vertexStateUpdate);
        if (this.parallelismSet) {
            return;
        }
        String vertexName = vertexStateUpdate.getVertexName();
        if (this.grandParents.contains(vertexName) && !this.finishedGrandParents.contains(vertexName)) {
            this.finishedGrandParents.add(vertexName);
        }
        TezOperator operator = this.tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
        List<TezOperator> predecessors2 = this.tezPlan.getPredecessors(operator);
        boolean z = false;
        Iterator<TezOperator> it = predecessors2.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            TezOperator next = it.next();
            boolean z2 = true;
            Iterator<TezOperator> it2 = this.tezPlan.getPredecessors(next).iterator();
            while (true) {
                if (it2.hasNext()) {
                    if (!this.finishedGrandParents.contains(it2.next().getOperatorKey().toString())) {
                        z2 = false;
                        break;
                    }
                } else {
                    break;
                }
            }
            if (z2) {
                LOG.info("All predecessors for " + next.getOperatorKey().toString() + " are finished, time to set parallelism for " + getContext().getVertexName());
                z = true;
                break;
            }
        }
        if (z) {
            for (TezOperator tezOperator : predecessors2) {
                if (tezOperator.getRequestedParallelism() == -1 && (predecessors = this.tezPlan.getPredecessors(tezOperator)) != null) {
                    for (TezOperator tezOperator2 : predecessors) {
                        String operatorKey = tezOperator2.getOperatorKey().toString();
                        if (this.finishedGrandParents.contains(operatorKey)) {
                            long dataSize = getContext().getVertexStatistics(operatorKey).getOutputStatistics(tezOperator.getOperatorKey().toString()).getDataSize();
                            int ceil = (int) Math.ceil(dataSize / this.bytesPerTask);
                            tezOperator2.setEstimatedParallelism(ceil);
                            LOG.info(getContext().getVertexName() + ": Grandparent " + tezOperator2.getOperatorKey().toString() + " finished with actual output " + dataSize + " (desired parallelism " + ceil + ")");
                        }
                    }
                }
            }
            try {
                new ParallelismSetter(this.tezPlan, this.pc).visit();
                this.thisParallelism = operator.getEstimatedParallelism();
                HashMap hashMap = new HashMap();
                for (Map.Entry entry : getContext().getInputVertexEdgeProperties().entrySet()) {
                    EdgeProperty edgeProperty = (EdgeProperty) entry.getValue();
                    hashMap.put(entry.getKey(), EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, edgeProperty.getDataSourceType(), edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(), edgeProperty.getEdgeDestination()));
                }
                getContext().reconfigureVertex(this.thisParallelism, (VertexLocationHint) null, hashMap);
                this.parallelismSet = true;
                LOG.info("Initialize parallelism for " + getContext().getVertexName() + " to " + this.thisParallelism);
            } catch (IOException e) {
                throw new TezUncheckedException(e);
            }
        }
    }
}
