package org.apache.spark.network.server;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.client.StreamInterceptor;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.Encodable;
import org.apache.spark.network.protocol.OneWayMessage;
import org.apache.spark.network.protocol.RequestMessage;
import org.apache.spark.network.protocol.RpcFailure;
import org.apache.spark.network.protocol.RpcRequest;
import org.apache.spark.network.protocol.RpcResponse;
import org.apache.spark.network.protocol.StreamFailure;
import org.apache.spark.network.protocol.StreamRequest;
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.protocol.UploadStream;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.base.Throwables;
import org.spark_project.io.netty.channel.Channel;
import org.spark_project.io.netty.channel.ChannelFuture;

/* loaded from: input_file:org/apache/spark/network/server/TransportRequestHandler.class */
public class TransportRequestHandler extends MessageHandler<RequestMessage> {
    private static final Logger logger;
    private final Channel channel;
    private final TransportClient reverseClient;
    private final RpcHandler rpcHandler;
    private final StreamManager streamManager;
    private final long maxChunksBeingTransferred;
    static final /* synthetic */ boolean $assertionsDisabled;

    public TransportRequestHandler(Channel channel, TransportClient transportClient, RpcHandler rpcHandler, Long l) {
        this.channel = channel;
        this.reverseClient = transportClient;
        this.rpcHandler = rpcHandler;
        this.streamManager = rpcHandler.getStreamManager();
        this.maxChunksBeingTransferred = l.longValue();
    }

    @Override // org.apache.spark.network.server.MessageHandler
    public void exceptionCaught(Throwable th) {
        this.rpcHandler.exceptionCaught(th, this.reverseClient);
    }

    @Override // org.apache.spark.network.server.MessageHandler
    public void channelActive() {
        this.rpcHandler.channelActive(this.reverseClient);
    }

    @Override // org.apache.spark.network.server.MessageHandler
    public void channelInactive() {
        if (this.streamManager != null) {
            try {
                this.streamManager.connectionTerminated(this.channel);
            } catch (RuntimeException e) {
                logger.error("StreamManager connectionTerminated() callback failed.", e);
            }
        }
        this.rpcHandler.channelInactive(this.reverseClient);
    }

    @Override // org.apache.spark.network.server.MessageHandler
    public void handle(RequestMessage requestMessage) {
        if (requestMessage instanceof RpcRequest) {
            processRpcRequest((RpcRequest) requestMessage);
            return;
        }
        if (requestMessage instanceof OneWayMessage) {
            processOneWayMessage((OneWayMessage) requestMessage);
        } else if (requestMessage instanceof StreamRequest) {
            processStreamRequest((StreamRequest) requestMessage);
        } else {
            if (!(requestMessage instanceof UploadStream)) {
                throw new IllegalArgumentException("Unknown request type: " + requestMessage);
            }
            processStreamUpload((UploadStream) requestMessage);
        }
    }

    private void processStreamRequest(StreamRequest streamRequest) {
        if (logger.isTraceEnabled()) {
            logger.trace("Received req from {} to fetch stream {}", NettyUtils.getRemoteAddress(this.channel), streamRequest.streamId);
        }
        long chunksBeingTransferred = this.streamManager.chunksBeingTransferred();
        if (chunksBeingTransferred >= this.maxChunksBeingTransferred) {
            logger.warn("The number of chunks being transferred {} is above {}, close the connection.", Long.valueOf(chunksBeingTransferred), Long.valueOf(this.maxChunksBeingTransferred));
            this.channel.close();
            return;
        }
        try {
            ManagedBuffer openStream = this.streamManager.openStream(streamRequest.streamId);
            if (openStream == null) {
                respond(new StreamFailure(streamRequest.streamId, String.format("Stream '%s' was not found.", streamRequest.streamId)));
            } else {
                this.streamManager.streamBeingSent(streamRequest.streamId);
                respond(new StreamResponse(streamRequest.streamId, openStream.size(), openStream)).addListener2(future -> {
                    this.streamManager.streamSent(streamRequest.streamId);
                });
            }
        } catch (Exception e) {
            logger.error(String.format("Error opening stream %s for request from %s", streamRequest.streamId, NettyUtils.getRemoteAddress(this.channel)), e);
            respond(new StreamFailure(streamRequest.streamId, Throwables.getStackTraceAsString(e)));
        }
    }

    private void processRpcRequest(final RpcRequest rpcRequest) {
        try {
            this.rpcHandler.receive(this.reverseClient, rpcRequest.body().nioByteBuffer(), new RpcResponseCallback() { // from class: org.apache.spark.network.server.TransportRequestHandler.1
                @Override // org.apache.spark.network.client.RpcResponseCallback
                public void onSuccess(ByteBuffer byteBuffer) {
                    TransportRequestHandler.this.respond(new RpcResponse(rpcRequest.requestId, new NioManagedBuffer(byteBuffer)));
                }

                @Override // org.apache.spark.network.client.RpcResponseCallback
                public void onFailure(Throwable th) {
                    TransportRequestHandler.this.respond(new RpcFailure(rpcRequest.requestId, Throwables.getStackTraceAsString(th)));
                }
            });
        } catch (Exception e) {
            logger.error("Error while invoking RpcHandler#receive() on RPC id " + rpcRequest.requestId, e);
            respond(new RpcFailure(rpcRequest.requestId, Throwables.getStackTraceAsString(e)));
        } finally {
            rpcRequest.body().release();
        }
    }

    private void processStreamUpload(final UploadStream uploadStream) {
        if (!$assertionsDisabled && uploadStream.body() != null) {
            throw new AssertionError();
        }
        try {
            try {
                final RpcResponseCallback rpcResponseCallback = new RpcResponseCallback() { // from class: org.apache.spark.network.server.TransportRequestHandler.2
                    @Override // org.apache.spark.network.client.RpcResponseCallback
                    public void onSuccess(ByteBuffer byteBuffer) {
                        TransportRequestHandler.this.respond(new RpcResponse(uploadStream.requestId, new NioManagedBuffer(byteBuffer)));
                    }

                    @Override // org.apache.spark.network.client.RpcResponseCallback
                    public void onFailure(Throwable th) {
                        TransportRequestHandler.this.respond(new RpcFailure(uploadStream.requestId, Throwables.getStackTraceAsString(th)));
                    }
                };
                TransportFrameDecoder transportFrameDecoder = (TransportFrameDecoder) this.channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
                final StreamCallbackWithID receiveStream = this.rpcHandler.receiveStream(this.reverseClient, uploadStream.meta.nioByteBuffer(), rpcResponseCallback);
                if (receiveStream == null) {
                    throw new NullPointerException("rpcHandler returned a null streamHandler");
                }
                StreamCallbackWithID streamCallbackWithID = new StreamCallbackWithID() { // from class: org.apache.spark.network.server.TransportRequestHandler.3
                    @Override // org.apache.spark.network.client.StreamCallback
                    public void onData(String str, ByteBuffer byteBuffer) throws IOException {
                        receiveStream.onData(str, byteBuffer);
                    }

                    @Override // org.apache.spark.network.client.StreamCallback
                    public void onComplete(String str) throws IOException {
                        try {
                            receiveStream.onComplete(str);
                            rpcResponseCallback.onSuccess(ByteBuffer.allocate(0));
                        } catch (Exception e) {
                            IOException iOException = new IOException("Failure post-processing complete stream; failing this rpc and leaving channel active", e);
                            rpcResponseCallback.onFailure(iOException);
                            receiveStream.onFailure(str, iOException);
                        }
                    }

                    @Override // org.apache.spark.network.client.StreamCallback
                    public void onFailure(String str, Throwable th) throws IOException {
                        rpcResponseCallback.onFailure(new IOException("Destination failed while reading stream", th));
                        receiveStream.onFailure(str, th);
                    }

                    @Override // org.apache.spark.network.client.StreamCallbackWithID
                    public String getID() {
                        return receiveStream.getID();
                    }
                };
                if (uploadStream.bodyByteCount > 0) {
                    transportFrameDecoder.setInterceptor(new StreamInterceptor(this, streamCallbackWithID.getID(), uploadStream.bodyByteCount, streamCallbackWithID));
                } else {
                    streamCallbackWithID.onComplete(streamCallbackWithID.getID());
                }
                uploadStream.meta.release();
            } catch (Exception e) {
                logger.error("Error while invoking RpcHandler#receive() on RPC id " + uploadStream.requestId, e);
                respond(new RpcFailure(uploadStream.requestId, Throwables.getStackTraceAsString(e)));
                this.channel.pipeline().fireExceptionCaught((Throwable) e);
                uploadStream.meta.release();
            }
        } catch (Throwable th) {
            uploadStream.meta.release();
            throw th;
        }
    }

    private void processOneWayMessage(OneWayMessage oneWayMessage) {
        try {
            this.rpcHandler.receive(this.reverseClient, oneWayMessage.body().nioByteBuffer());
        } catch (Exception e) {
            logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
        } finally {
            oneWayMessage.body().release();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v6, types: [org.spark_project.io.netty.channel.ChannelFuture] */
    public ChannelFuture respond(Encodable encodable) {
        SocketAddress remoteAddress = this.channel.remoteAddress();
        return this.channel.writeAndFlush(encodable).addListener2(future -> {
            if (future.isSuccess()) {
                logger.trace("Sent result {} to client {}", encodable, remoteAddress);
            } else {
                logger.error(String.format("Error sending result %s to %s; closing connection", encodable, remoteAddress), future.cause());
                this.channel.close();
            }
        });
    }

    static {
        $assertionsDisabled = !TransportRequestHandler.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(TransportRequestHandler.class);
    }
}
