/*
 * Decompiled with CFR 0.152.
 */
package reactor.aeron;

import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.aeron.AbortedException;
import reactor.aeron.AeronEventLoop;
import reactor.aeron.MessagePublication;
import reactor.aeron.MessageType;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;

final class AeronWriteSequencer {
    static final AtomicIntegerFieldUpdater<AeronWriteSequencer> WIP = AtomicIntegerFieldUpdater.newUpdater(AeronWriteSequencer.class, "wip");
    private static final Logger logger = Loggers.getLogger(AeronWriteSequencer.class);
    private final AeronEventLoop eventLoop;
    private final PublisherSender inner;
    private final Consumer<Throwable> errorHandler;
    private final BiPredicate<MonoSink<?>, Object> pendingWriteOffer;
    private final Queue<?> pendingWrites;
    private final Consumer<Object> discardedHandler;
    private volatile int wip;

    AeronWriteSequencer(long sessionId, MessagePublication publication, AeronEventLoop eventLoop) {
        this.eventLoop = eventLoop;
        this.discardedHandler = o -> {};
        this.pendingWrites = (Queue)Queues.unbounded().get();
        this.pendingWriteOffer = (BiPredicate)((Object)this.pendingWrites);
        this.errorHandler = throwable -> logger.error("Unexpected exception", throwable);
        this.inner = new PublisherSender(this, publication, sessionId);
    }

    Consumer<Throwable> getErrorHandler() {
        return this.errorHandler;
    }

    public Mono<Void> write(Publisher<?> publisher) {
        return Mono.defer(() -> this.eventLoop.execute(sink -> {
            boolean result = this.pendingWriteOffer.test((MonoSink<?>)sink, publisher);
            if (!result) {
                sink.error((Throwable)new Exception("Failed to enqueue publisher"));
            }
            this.drain();
        }));
    }

    void drain() {
        if (WIP.getAndIncrement(this) == 0) {
            while (true) {
                boolean empty;
                MonoSink promise;
                if (this.inner.isCancelled) {
                    this.discard();
                    this.inner.isCancelled = false;
                    if (WIP.decrementAndGet(this) != 0) continue;
                    break;
                }
                if (this.inner.isActive) {
                    if (WIP.decrementAndGet(this) != 0) continue;
                    break;
                }
                Object v = this.pendingWrites.poll();
                try {
                    promise = (MonoSink)v;
                }
                catch (Throwable e) {
                    this.getErrorHandler().accept(e);
                    return;
                }
                boolean bl = empty = promise == null;
                if (empty) {
                    if (WIP.decrementAndGet(this) != 0) continue;
                    break;
                }
                v = this.pendingWrites.poll();
                Publisher p = (Publisher)v;
                if (p instanceof Callable) {
                    Object vr;
                    Callable supplier = (Callable)p;
                    try {
                        vr = supplier.call();
                    }
                    catch (Throwable e) {
                        promise.error(e);
                        continue;
                    }
                    if (vr == null) {
                        promise.success();
                        continue;
                    }
                    this.inner.isActive = true;
                    this.inner.promise = promise;
                    this.inner.onSubscribe(Operators.scalarSubscription((CoreSubscriber)this.inner, (Object)((ByteBuffer)vr)));
                    continue;
                }
                this.inner.isActive = true;
                this.inner.promise = promise;
                p.subscribe((Subscriber)this.inner);
            }
        }
    }

    void discard() {
        while (!this.pendingWrites.isEmpty()) {
            MonoSink promise;
            Object v = this.pendingWrites.poll();
            try {
                promise = (MonoSink)v;
            }
            catch (Throwable e) {
                this.getErrorHandler().accept(e);
                return;
            }
            v = this.pendingWrites.poll();
            if (logger.isDebugEnabled()) {
                logger.debug("Terminated. Dropping: {}", new Object[]{v});
            }
            this.discardedHandler.accept(v);
            promise.error((Throwable)new AbortedException());
        }
    }

    static class PublisherSender
    implements CoreSubscriber<ByteBuffer>,
    Subscription {
        static final AtomicReferenceFieldUpdater<PublisherSender, Subscription> MISSED_SUBSCRIPTION = AtomicReferenceFieldUpdater.newUpdater(PublisherSender.class, Subscription.class, "missedSubscription");
        static final AtomicLongFieldUpdater<PublisherSender> MISSED_REQUESTED = AtomicLongFieldUpdater.newUpdater(PublisherSender.class, "missedRequested");
        static final AtomicLongFieldUpdater<PublisherSender> MISSED_PRODUCED = AtomicLongFieldUpdater.newUpdater(PublisherSender.class, "missedProduced");
        static final AtomicIntegerFieldUpdater<PublisherSender> WIP = AtomicIntegerFieldUpdater.newUpdater(PublisherSender.class, "wip");
        private final int batchSize;
        private final AeronWriteSequencer parent;
        private final long sessionId;
        private final MessagePublication publication;
        private volatile Subscription missedSubscription;
        private volatile long missedRequested;
        private volatile long missedProduced;
        private volatile int wip;
        private volatile boolean isCancelled;
        private volatile boolean isActive;
        private long requested;
        private boolean unbounded;
        private Subscription actual;
        private long produced;
        private MonoSink<?> promise;
        private long upstreamRequested;

        PublisherSender(AeronWriteSequencer parent, MessagePublication publication, long sessionId) {
            this.batchSize = 16;
            this.parent = parent;
            this.sessionId = sessionId;
            this.publication = publication;
        }

        public final void cancel() {
            if (!this.isCancelled) {
                this.isCancelled = true;
                this.drain();
            }
        }

        public void onComplete() {
            long p = this.produced;
            this.isActive = false;
            this.produced = 0L;
            this.produced(p);
            this.promise.success();
            this.parent.drain();
        }

        public void onError(Throwable t) {
            long p = this.produced;
            this.isActive = false;
            this.produced = 0L;
            this.produced(p);
            this.promise.error(t);
            this.parent.drain();
        }

        public void onNext(ByteBuffer t) {
            AeronEventLoop eventLoop = this.parent.eventLoop;
            if (eventLoop.inEventLoop()) {
                this.onNextInternal(t, null);
            } else {
                eventLoop.execute(sink -> this.onNextInternal(t, (MonoSink<Void>)sink)).subscribe(null, this::disposeCurrentDataStream);
            }
        }

        private void onNextInternal(ByteBuffer byteBuffer, MonoSink<Void> sink) {
            ++this.produced;
            this.publication.enqueue(MessageType.NEXT, byteBuffer, this.sessionId).doFinally(s -> {
                if (this.upstreamRequested - this.produced == 0L && this.requested - this.produced > 0L) {
                    this.requestFromUpstream(this.actual);
                }
            }).subscribe(null, this::disposeCurrentDataStream);
        }

        private void disposeCurrentDataStream(Throwable th) {
            this.cancel();
            this.promise.error((Throwable)new Exception("Failed to publish signal into session: " + this.sessionId, th));
            this.parent.drain();
        }

        public void onSubscribe(Subscription s) {
            Objects.requireNonNull(s);
            if (this.isCancelled) {
                s.cancel();
                return;
            }
            if (this.wip == 0 && WIP.compareAndSet(this, 0, 1)) {
                this.actual = s;
                this.upstreamRequested = 0L;
                this.request(Long.MAX_VALUE);
                long r = this.requested;
                if (WIP.decrementAndGet(this) != 0) {
                    this.drainLoop();
                }
                if (r != 0L) {
                    this.requestFromUpstream(s);
                }
                return;
            }
            MISSED_SUBSCRIPTION.set(this, s);
            this.drain();
        }

        public final void request(long n) {
            if (Operators.validate((long)n)) {
                if (this.unbounded) {
                    return;
                }
                if (this.wip == 0 && WIP.compareAndSet(this, 0, 1)) {
                    long r = this.requested;
                    if (r != Long.MAX_VALUE) {
                        this.requested = r = Operators.addCap((long)r, (long)n);
                        if (r == Long.MAX_VALUE) {
                            this.unbounded = true;
                        }
                    }
                    Subscription a = this.actual;
                    if (WIP.decrementAndGet(this) != 0) {
                        this.drainLoop();
                    }
                    if (a != null) {
                        this.requestFromUpstream(a);
                    }
                    return;
                }
                Operators.addCap(MISSED_REQUESTED, (Object)this, (long)n);
                this.drain();
            }
        }

        final void requestFromUpstream(Subscription s) {
            if (this.upstreamRequested < this.requested) {
                this.upstreamRequested += (long)this.batchSize;
                s.request((long)this.batchSize);
            }
        }

        final void drain() {
            if (WIP.getAndIncrement(this) != 0) {
                return;
            }
            this.drainLoop();
        }

        final void drainLoop() {
            int missed = 1;
            long requestAmount = 0L;
            Subscription requestTarget = null;
            do {
                long mp;
                long mr;
                Subscription ms;
                if ((ms = this.missedSubscription) != null) {
                    ms = MISSED_SUBSCRIPTION.getAndSet(this, null);
                }
                if ((mr = this.missedRequested) != 0L) {
                    mr = MISSED_REQUESTED.getAndSet(this, 0L);
                }
                if ((mp = this.missedProduced) != 0L) {
                    mp = MISSED_PRODUCED.getAndSet(this, 0L);
                }
                Subscription a = this.actual;
                if (this.isCancelled) {
                    if (a != null) {
                        a.cancel();
                        this.actual = null;
                        this.upstreamRequested = 0L;
                    }
                    if (ms != null) {
                        ms.cancel();
                    }
                    this.isActive = false;
                    continue;
                }
                long r = this.requested;
                if (r != Long.MAX_VALUE) {
                    long u = Operators.addCap((long)r, (long)mr);
                    if (u != Long.MAX_VALUE) {
                        long v = u - mp;
                        if (v < 0L) {
                            Operators.reportMoreProduced();
                            v = 0L;
                        }
                        r = v;
                    } else {
                        r = u;
                    }
                    this.requested = r;
                }
                if (ms != null) {
                    this.actual = ms;
                    this.upstreamRequested = 0L;
                    if (r == 0L) continue;
                    requestAmount = Operators.addCap((long)requestAmount, (long)r);
                    requestTarget = ms;
                    continue;
                }
                if (mr == 0L || a == null) continue;
                requestAmount = Operators.addCap((long)requestAmount, (long)mr);
                requestTarget = a;
            } while ((missed = WIP.addAndGet(this, -missed)) != 0);
            if (requestAmount != 0L) {
                this.requestFromUpstream(requestTarget);
            }
        }

        final void produced(long n) {
            if (this.unbounded) {
                return;
            }
            if (this.wip == 0 && WIP.compareAndSet(this, 0, 1)) {
                long r = this.requested;
                if (r != Long.MAX_VALUE) {
                    long u = r - n;
                    if (u < 0L) {
                        Operators.reportMoreProduced();
                        u = 0L;
                    }
                    this.requested = u;
                } else {
                    this.unbounded = true;
                }
                if (WIP.decrementAndGet(this) == 0) {
                    return;
                }
                this.drainLoop();
                return;
            }
            Operators.addCap(MISSED_PRODUCED, (Object)this, (long)n);
            this.drain();
        }
    }
}

