package org.apache.flume.source.thriftLegacy;

import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/thriftLegacy/ThriftLegacySource.class */
public class ThriftLegacySource extends AbstractSource implements EventDrivenSource, Configurable {
    static final Logger LOG = LoggerFactory.getLogger(ThriftLegacySource.class);
    static final String HOST = "host";
    static final String TIMESTAMP = "timestamp";
    static final String PRIORITY = "pri";
    static final String NANOS = "nanos";
    static final String OG_EVENT = "FlumeOG";
    private String host;
    private int port;
    private TServer server;
    private TServerTransport serverTransport;
    private Thread thriftHandlerThread;
    private Charset UTF_8 = Charset.forName("UTF-8");
    private CounterGroup counterGroup = new CounterGroup();

    /* loaded from: input_file:org/apache/flume/source/thriftLegacy/ThriftLegacySource$ThriftFlumeEventServerImpl.class */
    private class ThriftFlumeEventServerImpl implements ThriftFlumeEventServer.Iface {
        private ThriftFlumeEventServerImpl() {
        }

        @Override // com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Iface
        public void append(ThriftFlumeEvent thriftFlumeEvent) {
            if (thriftFlumeEvent == null) {
                return;
            }
            HashMap hashMap = new HashMap();
            hashMap.put(ThriftLegacySource.HOST, thriftFlumeEvent.getHost());
            hashMap.put(ThriftLegacySource.TIMESTAMP, Long.toString(thriftFlumeEvent.getTimestamp()));
            hashMap.put(ThriftLegacySource.PRIORITY, thriftFlumeEvent.getPriority().toString());
            hashMap.put(ThriftLegacySource.NANOS, Long.toString(thriftFlumeEvent.getNanos()));
            for (Map.Entry<String, ByteBuffer> entry : thriftFlumeEvent.getFields().entrySet()) {
                hashMap.put(entry.getKey().toString(), ThriftLegacySource.this.UTF_8.decode(entry.getValue()).toString());
            }
            hashMap.put(ThriftLegacySource.OG_EVENT, "yes");
            Event withBody = EventBuilder.withBody(thriftFlumeEvent.getBody(), hashMap);
            ThriftLegacySource.this.counterGroup.incrementAndGet("rpc.events");
            try {
                ThriftLegacySource.this.getChannelProcessor().processEvent(withBody);
                ThriftLegacySource.this.counterGroup.incrementAndGet("rpc.successful");
            } catch (ChannelException e) {
                ThriftLegacySource.LOG.warn("Failed to process event", e);
            }
        }

        @Override // com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Iface
        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/flume/source/thriftLegacy/ThriftLegacySource$ThriftHandler.class */
    public static class ThriftHandler implements Runnable {
        private TServer server;

        public ThriftHandler(TServer tServer) {
            this.server = tServer;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.server.serve();
        }
    }

    public void configure(Context context) {
        this.port = Integer.parseInt(context.getString("port"));
        this.host = context.getString(HOST);
    }

    public void start() {
        try {
            this.serverTransport = new TServerSocket(new InetSocketAddress(this.host, this.port));
            this.server = new TThreadPoolServer(new TThreadPoolServer.Args(this.serverTransport).processor(new ThriftFlumeEventServer.Processor(new ThriftFlumeEventServerImpl())));
            this.thriftHandlerThread = new Thread(new ThriftHandler(this.server));
            this.thriftHandlerThread.start();
            super.start();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }

    public void stop() {
        this.server.stop();
        this.serverTransport.close();
        try {
            this.thriftHandlerThread.join();
            super.stop();
        } catch (InterruptedException e) {
            LOG.warn("stop interrupted", e);
        }
    }
}
