/*
 * Decompiled with CFR 0.152.
 */
package com.a.eye.datacarrier.consumer;

import com.a.eye.datacarrier.buffer.Buffer;
import com.a.eye.datacarrier.buffer.Channels;
import com.a.eye.datacarrier.consumer.ConsumerThread;
import com.a.eye.datacarrier.consumer.IConsumer;
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;

public class ConsumerPool<T> {
    private boolean running = false;
    private ConsumerThread[] consumerThreads;
    private Channels<T> channels;
    private ReentrantLock lock;

    public ConsumerPool(Channels<T> channels, IConsumer<T> prototype, int num, boolean usePrototypeCopies) {
        this.channels = channels;
        this.consumerThreads = new ConsumerThread[num];
        for (int i = 0; i < num; ++i) {
            this.consumerThreads[i] = new ConsumerThread<T>("DataCarrier.Consumser." + i + ".Thread", usePrototypeCopies ? this.getNewConsumerInstance(prototype) : prototype);
        }
        this.lock = new ReentrantLock();
    }

    private IConsumer<T> getNewConsumerInstance(IConsumer<T> prototype) {
        try {
            return (IConsumer)prototype.getClass().newInstance();
        }
        catch (InstantiationException e) {
            return prototype;
        }
        catch (IllegalAccessException e) {
            return prototype;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void begin() {
        if (this.running) {
            return;
        }
        try {
            this.lock.lock();
            this.allocateBuffer2Thread();
            for (ConsumerThread consumerThread : this.consumerThreads) {
                consumerThread.start();
            }
            this.running = true;
        }
        finally {
            this.lock.unlock();
        }
    }

    private void allocateBuffer2Thread() {
        int channelSize = this.channels.getChannelSize();
        if (channelSize < this.consumerThreads.length) {
            ArrayList[] threadAllocation = new ArrayList[channelSize];
            for (int threadIndex = 0; threadIndex < this.consumerThreads.length; ++threadIndex) {
                int index = threadIndex % channelSize;
                if (threadAllocation[index] == null) {
                    threadAllocation[index] = new ArrayList();
                }
                threadAllocation[index].add(threadIndex);
            }
            for (int channelIndex = 0; channelIndex < channelSize; ++channelIndex) {
                ArrayList threadAllocationPerChannel = threadAllocation[channelIndex];
                Buffer<T> channel = this.channels.getBuffer(channelIndex);
                int bufferSize = channel.getBufferSize();
                int step = bufferSize / threadAllocationPerChannel.size();
                for (int i = 0; i < threadAllocationPerChannel.size(); ++i) {
                    int threadIndex = (Integer)threadAllocationPerChannel.get(i);
                    int start = i * step;
                    int end = i == threadAllocationPerChannel.size() - 1 ? bufferSize : (i + 1) * step;
                    this.consumerThreads[threadIndex].addDataSource(channel, start, end);
                }
            }
        } else {
            for (int channelIndex = 0; channelIndex < channelSize; ++channelIndex) {
                int consumerIndex = channelIndex % this.consumerThreads.length;
                this.consumerThreads[consumerIndex].addDataSource(this.channels.getBuffer(channelIndex));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        try {
            this.lock.lock();
            for (ConsumerThread consumerThread : this.consumerThreads) {
                consumerThread.shutdown();
            }
        }
        finally {
            this.lock.unlock();
        }
    }
}

