package org.apache.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.thrift.Status;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.flume.thrift.ThriftSourceProtocol;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/ThriftSource.class */
public class ThriftSource extends AbstractSource implements Configurable, EventDrivenSource {
    public static final Logger logger = LoggerFactory.getLogger(ThriftSource.class);
    public static final String CONFIG_THREADS = "threads";
    public static final String CONFIG_BIND = "bind";
    public static final String CONFIG_PORT = "port";
    private Integer port;
    private String bindAddress;
    private int maxThreads = 0;
    private SourceCounter sourceCounter;
    private TServer server;
    private TServerTransport serverTransport;
    private ExecutorService servingExecutor;

    /* loaded from: input_file:org/apache/flume/source/ThriftSource$ThriftSourceHandler.class */
    private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
        private ThriftSourceHandler() {
        }

        @Override // org.apache.flume.thrift.ThriftSourceProtocol.Iface
        public Status append(ThriftFlumeEvent thriftFlumeEvent) throws TException {
            Event withBody = EventBuilder.withBody(thriftFlumeEvent.getBody(), thriftFlumeEvent.getHeaders());
            ThriftSource.this.sourceCounter.incrementAppendReceivedCount();
            ThriftSource.this.sourceCounter.incrementEventReceivedCount();
            try {
                ThriftSource.this.getChannelProcessor().processEvent(withBody);
                ThriftSource.this.sourceCounter.incrementAppendAcceptedCount();
                ThriftSource.this.sourceCounter.incrementEventAcceptedCount();
                return Status.OK;
            } catch (ChannelException e) {
                ThriftSource.logger.warn("Thrift source " + ThriftSource.this.getName() + " could not append events to the channel.", (Throwable) e);
                return Status.FAILED;
            }
        }

        @Override // org.apache.flume.thrift.ThriftSourceProtocol.Iface
        public Status appendBatch(List<ThriftFlumeEvent> list) throws TException {
            ThriftSource.this.sourceCounter.incrementAppendBatchReceivedCount();
            ThriftSource.this.sourceCounter.addToEventReceivedCount(list.size());
            ArrayList newArrayList = Lists.newArrayList();
            for (ThriftFlumeEvent thriftFlumeEvent : list) {
                newArrayList.add(EventBuilder.withBody(thriftFlumeEvent.getBody(), thriftFlumeEvent.getHeaders()));
            }
            try {
                ThriftSource.this.getChannelProcessor().processEventBatch(newArrayList);
                ThriftSource.this.sourceCounter.incrementAppendBatchAcceptedCount();
                ThriftSource.this.sourceCounter.addToEventAcceptedCount(list.size());
                return Status.OK;
            } catch (ChannelException e) {
                ThriftSource.logger.warn("Thrift source %s could not append events to the channel.", ThriftSource.this.getName());
                return Status.FAILED;
            }
        }
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        logger.info("Configuring thrift source.");
        this.port = context.getInteger("port");
        Preconditions.checkNotNull(this.port, "Port must be specified for Thrift Source.");
        this.bindAddress = context.getString("bind");
        Preconditions.checkNotNull(this.bindAddress, "Bind address must be specified for Thrift Source.");
        try {
            this.maxThreads = context.getInteger(CONFIG_THREADS, 0).intValue();
        } catch (NumberFormatException e) {
            logger.warn("Thrift source's \"threads\" property must specify an integer value: " + context.getString(CONFIG_THREADS));
        }
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        Class<?> cls;
        Class<?> cls2;
        TServer.AbstractServerArgs abstractServerArgs;
        logger.info("Starting thrift source");
        this.maxThreads = this.maxThreads <= 0 ? Integer.MAX_VALUE : this.maxThreads;
        try {
            cls = Class.forName("org.apache.thrift.server.TThreadedSelectorServer");
            cls2 = Class.forName("org.apache.thrift.server.TThreadedSelectorServer$Args");
            ThreadFactory build = new ThreadFactoryBuilder().setNameFormat("Flume Thrift IPC Thread %d").build();
            ExecutorService newCachedThreadPool = this.maxThreads == 0 ? Executors.newCachedThreadPool(build) : Executors.newFixedThreadPool(this.maxThreads, build);
            this.serverTransport = new TNonblockingServerSocket(new InetSocketAddress(this.bindAddress, this.port.intValue()));
            abstractServerArgs = (TNonblockingServer.AbstractNonblockingServerArgs) cls2.getConstructor(TNonblockingServerTransport.class).newInstance(this.serverTransport);
            cls2.getDeclaredMethod("executorService", ExecutorService.class).invoke(abstractServerArgs, newCachedThreadPool);
        } catch (ClassNotFoundException e) {
            logger.info("TThreadedSelectorServer not found, using TThreadPoolServer");
            try {
                this.serverTransport = new TServerSocket(new InetSocketAddress(this.bindAddress, this.port.intValue()));
                cls = Class.forName("org.apache.thrift.server.TThreadPoolServer");
                cls2 = Class.forName("org.apache.thrift.server.TThreadPoolServer$Args");
                abstractServerArgs = (TServer.AbstractServerArgs) cls2.getConstructor(TServerTransport.class).newInstance(this.serverTransport);
                cls2.getDeclaredMethod("maxWorkerThreads", Integer.TYPE).invoke(abstractServerArgs, Integer.valueOf(this.maxThreads));
            } catch (ClassNotFoundException e2) {
                throw new FlumeException("Cannot find TThreadSelectorServer or TThreadPoolServer. Please install a compatible version of thrift in the classpath", e2);
            } catch (Throwable th) {
                throw new FlumeException("Cannot start Thrift source.", th);
            }
        } catch (Throwable th2) {
            throw new FlumeException("Cannot start Thrift source.", th2);
        }
        try {
            abstractServerArgs.protocolFactory(new TCompactProtocol.Factory());
            abstractServerArgs.inputTransportFactory(new TFastFramedTransport.Factory());
            abstractServerArgs.outputTransportFactory(new TFastFramedTransport.Factory());
            abstractServerArgs.processor(new ThriftSourceProtocol.Processor(new ThriftSourceHandler()));
            this.server = (TServer) cls.getConstructor(cls2).newInstance(abstractServerArgs);
            this.servingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss").build());
            this.servingExecutor.submit(new Runnable() { // from class: org.apache.flume.source.ThriftSource.1
                @Override // java.lang.Runnable
                public void run() {
                    ThriftSource.this.server.serve();
                }
            });
            long currentTimeMillis = System.currentTimeMillis();
            while (!this.server.isServing()) {
                try {
                    if (System.currentTimeMillis() - currentTimeMillis >= 10000) {
                        throw new FlumeException("Thrift server failed to start!");
                    }
                    TimeUnit.MILLISECONDS.sleep(1000L);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                    throw new FlumeException("Interrupted while waiting for Thrift server to start.", e3);
                }
            }
            this.sourceCounter.start();
            logger.info("Started Thrift source.");
            super.start();
        } catch (Throwable th3) {
            throw new FlumeException("Cannot start Thrift Source.", th3);
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        if (this.server != null && this.server.isServing()) {
            this.server.stop();
        }
        if (this.servingExecutor != null) {
            this.servingExecutor.shutdown();
            try {
                if (!this.servingExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.servingExecutor.shutdownNow();
                }
            } catch (InterruptedException e) {
                throw new FlumeException("Interrupted while waiting for server to be shutdown.");
            }
        }
        this.sourceCounter.stop();
        super.stop();
    }
}
