/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.stream;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Operators;
import reactor.ipc.connector.Inbound;
import reactor.ipc.stream.OnStream;
import reactor.ipc.stream.StreamOperations;
import reactor.ipc.stream.StreamOutbound;
import reactor.util.Logger;
import reactor.util.Loggers;

final class StreamOperationsImpl<IN, OUT>
extends AtomicLong
implements StreamOperations {
    static Logger log = Loggers.getLogger(StreamOperationsImpl.class);
    final ConcurrentMap<Long, Subscriber<OUT>> subscribers;
    final ConcurrentMap<Long, Subscription> subscriptions;
    final StreamOutbound remote;
    final String name;
    final OnStream onNew;
    final Runnable onTerminate;
    final AtomicBoolean terminateOnce;
    final Inbound<? extends IN> channel;

    StreamOperationsImpl(String name, OnStream onNew, StreamOutbound remote, Inbound<? extends IN> channel, Runnable onTerminate) {
        super(1L);
        this.name = name;
        this.channel = channel;
        this.remote = remote;
        this.onNew = onNew;
        this.onTerminate = onTerminate;
        this.terminateOnce = new AtomicBoolean();
        this.subscribers = new ConcurrentHashMap<Long, Subscriber<OUT>>();
        this.subscriptions = new ConcurrentHashMap<Long, Subscription>();
    }

    long newStreamId() {
        return this.getAndIncrement();
    }

    void registerSubscription(long streamId, Subscription s) {
        if (this.subscriptions.putIfAbsent(streamId, s) != null) {
            throw new IllegalStateException("StreamID " + streamId + " already registered");
        }
    }

    void registerSubscriber(long streamId, Subscriber<OUT> s) {
        if (this.subscribers.putIfAbsent(streamId, s) != null) {
            throw new IllegalStateException("StreamID " + streamId + " already registered");
        }
    }

    boolean deregister(long streamId) {
        this.subscribers.remove(streamId);
        return this.subscriptions.remove(streamId) != null;
    }

    @Override
    public void onNew(long streamId, String function) {
        if (log.isDebugEnabled()) {
            log.debug("{}/onStream/{}/{}", new Object[]{this.name, streamId, function});
        }
        if (!this.onNew.onStream(streamId, function, this)) {
            if (log.isDebugEnabled()) {
                log.debug("{}/onStream/{} {}", new Object[]{this.name, streamId, "New stream(" + function + ") rejected"});
            }
            this.sendCancel(streamId, "New stream(" + function + ") rejected");
        }
    }

    @Override
    public void onNext(long streamId, Object o) {
        Subscriber local;
        if (log.isDebugEnabled()) {
            log.debug("{}/onNext/{}/value={}", new Object[]{this.name, streamId, o});
        }
        if ((local = (Subscriber)this.subscribers.get(streamId)) != null) {
            try {
                local.onNext(o);
            }
            catch (Throwable ex) {
                if (log.isDebugEnabled()) {
                    log.debug("{}/onNextError/{}/value={}", new Object[]{this.name, streamId, o, ex});
                }
                this.sendCancel(streamId, ex.toString());
                local.onError(ex);
            }
        }
    }

    @Override
    public void onError(long streamId, String reason) {
        this.onError(streamId, new Exception(reason));
    }

    @Override
    public void onError(long streamId, Throwable e) {
        if (log.isDebugEnabled()) {
            log.debug("{}/onError/{}", new Object[]{this.name, streamId, e});
        }
        if (streamId > 0L) {
            Subscriber local = (Subscriber)this.subscribers.get(streamId);
            if (local != null) {
                local.onError(e);
                return;
            }
        } else if (streamId < 0L) {
            if (this.terminateOnce.compareAndSet(false, true)) {
                this.onTerminate.run();
            }
            if (this.isClosed()) {
                return;
            }
        }
        Operators.onErrorDropped((Throwable)e);
    }

    @Override
    public void onComplete(long streamId) {
        Subscriber local = (Subscriber)this.subscribers.get(streamId);
        if (local != null) {
            local.onComplete();
        }
    }

    @Override
    public void onCancel(long streamId, String reason) {
        Subscription remove;
        if (log.isDebugEnabled()) {
            log.debug("{}/onCancel/{} {}", new Object[]{this.name, streamId, reason});
        }
        if ((remove = (Subscription)this.subscriptions.get(streamId)) != null) {
            remove.cancel();
        }
    }

    @Override
    public void onRequested(long streamId, long n) {
        Subscription remote;
        if (log.isDebugEnabled()) {
            log.debug("{}/onRequested/{}/{}", new Object[]{this.name, streamId, n});
        }
        if ((remote = (Subscription)this.subscriptions.get(streamId)) != null) {
            remote.request(n);
        }
    }

    @Override
    public void sendNew(long streamId, String function) {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendNew/{}/{}", new Object[]{this.name, streamId, function});
        }
        this.remote.sendNew(streamId, function);
    }

    @Override
    public void sendCancel(long streamId, String reason) {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendCancel/{} {}", new Object[]{this.name, streamId, reason});
        }
        this.remote.sendCancel(streamId, reason);
    }

    @Override
    public void sendNext(long streamId, Object o) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendNext/{}/value={}", new Object[]{this.name, streamId, o});
        }
        this.remote.sendNext(streamId, o);
    }

    @Override
    public void sendError(long streamId, Throwable e) {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendError/{}", new Object[]{this.name, streamId, e});
        }
        this.remote.sendError(streamId, e);
    }

    @Override
    public void sendComplete(long streamId) {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendComplete/{}", new Object[]{this.name, streamId});
        }
        this.remote.sendComplete(streamId);
    }

    @Override
    public void sendRequested(long streamId, long n) {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendRequested/{}/{}", new Object[]{this.name, streamId, n});
        }
        this.remote.sendRequested(streamId, n);
    }

    @Override
    public boolean isClosed() {
        return this.remote.isClosed();
    }
}

