package org.apache.spark.network.server;

import java.net.SocketAddress;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.ChunkFetchFailure;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.Encodable;
import org.apache.spark.network.util.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.base.Throwables;
import org.spark_project.io.netty.channel.Channel;
import org.spark_project.io.netty.channel.ChannelFuture;
import org.spark_project.io.netty.channel.ChannelHandlerContext;
import org.spark_project.io.netty.channel.SimpleChannelInboundHandler;
import org.spark_project.io.netty.util.concurrent.Future;
import org.spark_project.io.netty.util.concurrent.GenericFutureListener;

/* loaded from: input_file:org/apache/spark/network/server/ChunkFetchRequestHandler.class */
public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkFetchRequest> {
    private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
    private final TransportClient client;
    private final StreamManager streamManager;
    private final long maxChunksBeingTransferred;

    public ChunkFetchRequestHandler(TransportClient transportClient, StreamManager streamManager, Long l) {
        this.client = transportClient;
        this.streamManager = streamManager;
        this.maxChunksBeingTransferred = l.longValue();
    }

    @Override // org.spark_project.io.netty.channel.ChannelInboundHandlerAdapter, org.spark_project.io.netty.channel.ChannelHandlerAdapter, org.spark_project.io.netty.channel.ChannelHandler, org.spark_project.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(channelHandlerContext.channel()), th);
        channelHandlerContext.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.spark_project.io.netty.channel.SimpleChannelInboundHandler
    public void channelRead0(ChannelHandlerContext channelHandlerContext, ChunkFetchRequest chunkFetchRequest) throws Exception {
        Channel channel = channelHandlerContext.channel();
        if (logger.isTraceEnabled()) {
            logger.trace("Received req from {} to fetch block {}", NettyUtils.getRemoteAddress(channel), chunkFetchRequest.streamChunkId);
        }
        long chunksBeingTransferred = this.streamManager.chunksBeingTransferred();
        if (chunksBeingTransferred >= this.maxChunksBeingTransferred) {
            logger.warn("The number of chunks being transferred {} is above {}, close the connection.", Long.valueOf(chunksBeingTransferred), Long.valueOf(this.maxChunksBeingTransferred));
            channel.close();
            return;
        }
        try {
            this.streamManager.checkAuthorization(this.client, chunkFetchRequest.streamChunkId.streamId);
            this.streamManager.registerChannel(channel, chunkFetchRequest.streamChunkId.streamId);
            ManagedBuffer chunk = this.streamManager.getChunk(chunkFetchRequest.streamChunkId.streamId, chunkFetchRequest.streamChunkId.chunkIndex);
            this.streamManager.chunkBeingSent(chunkFetchRequest.streamChunkId.streamId);
            respond(channel, new ChunkFetchSuccess(chunkFetchRequest.streamChunkId, chunk)).addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFuture -> {
                this.streamManager.chunkSent(chunkFetchRequest.streamChunkId.streamId);
            });
        } catch (Exception e) {
            logger.error(String.format("Error opening block %s for request from %s", chunkFetchRequest.streamChunkId, NettyUtils.getRemoteAddress(channel)), e);
            respond(channel, new ChunkFetchFailure(chunkFetchRequest.streamChunkId, Throwables.getStackTraceAsString(e)));
        }
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [org.spark_project.io.netty.channel.ChannelFuture] */
    private ChannelFuture respond(Channel channel, Encodable encodable) throws InterruptedException {
        SocketAddress remoteAddress = channel.remoteAddress();
        return channel.writeAndFlush(encodable).await2().addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFuture -> {
            if (channelFuture.isSuccess()) {
                logger.trace("Sent result {} to client {}", encodable, remoteAddress);
            } else {
                logger.error(String.format("Error sending result %s to %s; closing connection", encodable, remoteAddress), channelFuture.cause());
                channel.close();
            }
        });
    }
}
