/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.internal;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.QueueFullException;
import io.servicetalk.concurrent.internal.SignalOffloader;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ThreadBasedSignalOffloader
implements SignalOffloader,
Runnable {
    private static final int INDEX_INIT = -1;
    private static final int INDEX_OFFLOADER_TERMINATED = -2;
    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadBasedSignalOffloader.class);
    private static final String UNKNOWN_EXECUTOR_THREAD_NAME = "signal-offloader-<unknown>";
    private static final AtomicIntegerFieldUpdater<ThreadBasedSignalOffloader> lastEntityIndexUpdater = AtomicIntegerFieldUpdater.newUpdater(ThreadBasedSignalOffloader.class, "lastEntityIndex");
    private final Executor executor;
    private final int publisherSignalQueueInitialCapacity;
    private OffloadedEntity[] offloadedEntities;
    private volatile int lastEntityIndex = -1;
    @Nullable
    private volatile Thread executorThread;

    ThreadBasedSignalOffloader(Executor executor) {
        this(executor, 2, 2);
    }

    ThreadBasedSignalOffloader(Executor executor, int expectedOffloadingEntities, int publisherSignalQueueInitialCapacity) {
        this.executor = Objects.requireNonNull(executor);
        this.publisherSignalQueueInitialCapacity = publisherSignalQueueInitialCapacity;
        this.offloadedEntities = new OffloadedEntity[expectedOffloadingEntities];
    }

    @Override
    public <T> PublisherSource.Subscriber<? super T> offloadSubscriber(PublisherSource.Subscriber<? super T> subscriber) {
        return this.addOffloadedEntity(new OffloadedSubscriber<T>(this, subscriber));
    }

    @Override
    public <T> SingleSource.Subscriber<? super T> offloadSubscriber(SingleSource.Subscriber<? super T> subscriber) {
        return this.addOffloadedEntity(new OffloadedSingleSubscriber<T>(this, subscriber));
    }

    @Override
    public CompletableSource.Subscriber offloadSubscriber(CompletableSource.Subscriber subscriber) {
        return this.addOffloadedEntity(new OffloadedCompletableSubscriber(this, subscriber));
    }

    @Override
    public <T> PublisherSource.Subscriber<? super T> offloadSubscription(PublisherSource.Subscriber<? super T> subscriber) {
        return this.addOffloadedEntity(new OffloadedSubscription<T>(this, subscriber));
    }

    @Override
    public <T> SingleSource.Subscriber<? super T> offloadCancellable(SingleSource.Subscriber<? super T> subscriber) {
        return this.addOffloadedEntity(new OffloadedSingleCancellable(this, subscriber));
    }

    @Override
    public CompletableSource.Subscriber offloadCancellable(CompletableSource.Subscriber subscriber) {
        return this.addOffloadedEntity(new OffloadedCompletableCancellable(this, subscriber));
    }

    @Override
    public <T> void offloadSubscribe(PublisherSource.Subscriber<? super T> subscriber, Consumer<PublisherSource.Subscriber<? super T>> handleSubscribe) {
        try {
            this.addOffloadedEntity(new OffloadedSignalEntity<PublisherSource.Subscriber<? super T>>(handleSubscribe, subscriber), true);
        }
        catch (EnqueueForOffloadingFailed e) {
            SubscriberUtils.deliverErrorFromSource(subscriber, e.getCause());
        }
    }

    @Override
    public <T> void offloadSubscribe(SingleSource.Subscriber<? super T> subscriber, Consumer<SingleSource.Subscriber<? super T>> handleSubscribe) {
        try {
            this.addOffloadedEntity(new OffloadedSignalEntity<SingleSource.Subscriber<? super T>>(handleSubscribe, subscriber), true);
        }
        catch (EnqueueForOffloadingFailed e) {
            SubscriberUtils.deliverErrorFromSource(subscriber, e.getCause());
        }
    }

    @Override
    public void offloadSubscribe(CompletableSource.Subscriber subscriber, Consumer<CompletableSource.Subscriber> handleSubscribe) {
        try {
            this.addOffloadedEntity(new OffloadedSignalEntity<CompletableSource.Subscriber>(handleSubscribe, subscriber), true);
        }
        catch (EnqueueForOffloadingFailed e) {
            SubscriberUtils.deliverErrorFromSource(subscriber, e.getCause());
        }
    }

    @Override
    public <T> void offloadSignal(T signal, Consumer<T> signalConsumer) {
        this.addOffloadedEntity(new OffloadedSignalEntity<T>(signalConsumer, signal));
    }

    @Override
    public void run() {
        Thread executorThread;
        assert (this.executorThread == null);
        this.executorThread = executorThread = Thread.currentThread();
        while (true) {
            int lastIndex = this.lastEntityIndex;
            assert (lastIndex >= 0);
            OffloadedEntity[] entities = this.offloadedEntities;
            int terminatedEntities = 0;
            for (int i = 0; i <= lastIndex; ++i) {
                OffloadedEntity entity = entities[i];
                if (!entity.isTerminated()) {
                    entity.sendSignals();
                    if (!entity.isTerminated()) continue;
                    ++terminatedEntities;
                    continue;
                }
                ++terminatedEntities;
            }
            if (terminatedEntities == this.lastEntityIndex + 1) {
                this.lastEntityIndex = -2;
                return;
            }
            if (lastIndex != this.lastEntityIndex) continue;
            LockSupport.park(executorThread);
        }
    }

    void notifyExecutor() {
        this.notifyExecutor(this.executorThread);
    }

    void notifyExecutor(@Nullable Thread executorThread) {
        LockSupport.unpark(executorThread);
    }

    private <T extends OffloadedEntity> T addOffloadedEntity(T offloadedEntity) {
        return this.addOffloadedEntity(offloadedEntity, false);
    }

    private <T extends OffloadedEntity> T addOffloadedEntity(T offloadedEntity, boolean wrapEnqueueFailure) {
        int lastIndex = this.lastEntityIndex;
        if (lastIndex == -2) {
            IllegalStateException iae = new IllegalStateException("Signal offloader: " + this.executorThreadName() + " has already terminated.");
            throw wrapEnqueueFailure ? new EnqueueForOffloadingFailed(iae) : iae;
        }
        int nextIndex = lastIndex + 1;
        if (nextIndex == this.offloadedEntities.length) {
            OffloadedEntity[] nextValue = new OffloadedEntity[this.offloadedEntities.length * 2];
            System.arraycopy(this.offloadedEntities, 0, nextValue, 0, this.offloadedEntities.length);
            this.offloadedEntities = nextValue;
        }
        this.offloadedEntities[nextIndex] = offloadedEntity;
        if (!lastEntityIndexUpdater.compareAndSet(this, lastIndex, nextIndex)) {
            RuntimeException iae;
            if (this.lastEntityIndex == -2) {
                iae = new IllegalStateException("Signal offloader: " + this.executorThreadName() + " has already terminated.");
                throw wrapEnqueueFailure ? new EnqueueForOffloadingFailed(iae) : iae;
            }
            iae = new IllegalArgumentException("Entity " + offloadedEntity + " added concurrently for offloading signals.");
            throw wrapEnqueueFailure ? new EnqueueForOffloadingFailed(iae) : iae;
        }
        if (nextIndex == 0) {
            if (wrapEnqueueFailure) {
                try {
                    this.executor.execute(this);
                }
                catch (RejectedExecutionException re) {
                    throw new EnqueueForOffloadingFailed(re);
                }
            } else {
                this.executor.execute(this);
            }
        } else {
            Thread executorThread = this.executorThread;
            if (Thread.currentThread() != executorThread) {
                this.notifyExecutor(executorThread);
            }
        }
        return offloadedEntity;
    }

    private String executorThreadName() {
        Thread executorThread = this.executorThread;
        return executorThread == null ? UNKNOWN_EXECUTOR_THREAD_NAME : executorThread.getName();
    }

    @Nullable
    private static <T> T uncheckedCast(@Nullable Object signal) {
        return (T)signal;
    }

    private static final class EnqueueForOffloadingFailed
    extends RuntimeException {
        private static final long serialVersionUID = 7000860459929007810L;

        EnqueueForOffloadingFailed(Exception cause) {
            super(cause);
        }

        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    private static final class OffloadedCompletableCancellable
    extends AbstractOffloadedCancellable
    implements CompletableSource.Subscriber {
        private final CompletableSource.Subscriber original;

        OffloadedCompletableCancellable(ThreadBasedSignalOffloader offloader, CompletableSource.Subscriber original) {
            super(offloader);
            this.original = original;
        }

        @Override
        public void onSubscribe(Cancellable cancellable) {
            this.cancellable(cancellable);
            this.original.onSubscribe(this);
        }

        @Override
        public void onComplete() {
            this.setDone();
            this.original.onComplete();
        }

        @Override
        public void onError(Throwable t) {
            this.setDone();
            this.original.onError(t);
        }
    }

    private static final class OffloadedSingleCancellable<T>
    extends AbstractOffloadedCancellable
    implements SingleSource.Subscriber<T> {
        private final SingleSource.Subscriber<? super T> original;

        private OffloadedSingleCancellable(ThreadBasedSignalOffloader offloader, SingleSource.Subscriber<? super T> original) {
            super(offloader);
            this.original = original;
        }

        @Override
        public void onSubscribe(Cancellable cancellable) {
            this.cancellable(cancellable);
            this.original.onSubscribe(this);
        }

        @Override
        public void onSuccess(@Nullable T result) {
            this.setDone();
            this.original.onSuccess(result);
        }

        @Override
        public void onError(Throwable t) {
            this.setDone();
            this.original.onError(t);
        }
    }

    private static abstract class AbstractOffloadedCancellable
    extends AbstractOffloadedEntity
    implements Cancellable {
        private static final byte UNAVAILABLE = 0;
        private static final byte CANCELLED = 1;
        private static final byte DONE = 2;
        private byte result;
        @Nullable
        private Cancellable cancellable;

        AbstractOffloadedCancellable(ThreadBasedSignalOffloader offloader) {
            super(offloader);
        }

        final void cancellable(Cancellable cancellable) {
            this.cancellable = cancellable;
        }

        @Override
        public final void cancel() {
            byte oldResult = this.result;
            this.result = 1;
            if (oldResult == 0) {
                this.notifyExecutor();
            }
        }

        @Override
        public final void sendSignals0() {
            Cancellable c = this.cancellable;
            if (this.result == 0 || c == null) {
                return;
            }
            this.setTerminated();
            if (this.result == 1) {
                SubscriberUtils.safeCancel(c);
            }
        }

        final void setDone() {
            byte oldResult = this.result;
            this.result = (byte)2;
            if (oldResult == 0) {
                this.notifyExecutor();
            }
        }
    }

    private static final class OffloadedSubscription<T>
    extends AbstractOffloadedEntity
    implements PublisherSource.Subscriber<T> {
        private static final AtomicLongFieldUpdater<OffloadedSubscription> requestedUpdater = AtomicLongFieldUpdater.newUpdater(OffloadedSubscription.class, "requested");
        public static final int CANCELLED = -1;
        public static final int TERMINATED = -2;
        private final PublisherSource.Subscriber<? super T> original;
        @Nullable
        private PublisherSource.Subscription subscription;
        private volatile long requested;

        OffloadedSubscription(ThreadBasedSignalOffloader offloader, PublisherSource.Subscriber<? super T> original) {
            super(offloader);
            this.original = original;
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription s) {
            this.subscription = s;
            this.original.onSubscribe(new PublisherSource.Subscription(){

                @Override
                public void request(long n) {
                    boolean notify;
                    if (SubscriberUtils.isRequestNValid(n)) {
                        notify = requestedUpdater.getAndAccumulate(this, n, FlowControlUtils::addWithOverflowProtectionIfNotNegative) >= 0L;
                    } else {
                        boolean bl = notify = requestedUpdater.getAndSet(this, n < -2L ? n : Long.MIN_VALUE) >= 0L;
                    }
                    if (notify) {
                        this.notifyExecutor();
                    }
                }

                @Override
                public void cancel() {
                    this.requested(-1L);
                }
            });
        }

        @Override
        public void onNext(T t) {
            this.original.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            this.requested(-2L);
            this.original.onError(t);
        }

        @Override
        public void onComplete() {
            this.requested(-2L);
            this.original.onComplete();
        }

        @Override
        public void sendSignals0() {
            long toRequest;
            PublisherSource.Subscription s = this.subscription;
            if (s == null) {
                return;
            }
            while ((toRequest = requestedUpdater.getAndSet(this, 0L)) > 0L) {
                s.request(toRequest);
            }
            if (toRequest == 0L) {
                return;
            }
            if (toRequest == -2L) {
                this.setTerminated();
                return;
            }
            if (toRequest == -1L) {
                this.setTerminated();
                s.cancel();
                return;
            }
            this.setTerminated();
            s.request(toRequest);
        }

        private void requested(long newValue) {
            this.requested = newValue;
            this.notifyExecutor();
        }
    }

    private static final class OffloadedCompletableSubscriber
    extends AbstractOffloadedSingleValueSubscriber
    implements CompletableSource.Subscriber {
        private static final Object COMPLETE_SIGNAL = new Object();
        private final CompletableSource.Subscriber original;

        OffloadedCompletableSubscriber(ThreadBasedSignalOffloader offloader, CompletableSource.Subscriber original) {
            super(offloader);
            this.original = original;
        }

        @Override
        public void onComplete() {
            this.result(COMPLETE_SIGNAL);
        }

        @Override
        public void onError(Throwable t) {
            this.result(TerminalNotification.error(t));
        }

        @Override
        void sendOnSubscribe(Cancellable c) {
            try {
                this.original.onSubscribe(c);
            }
            catch (Throwable throwable) {
                this.sendError(throwable);
                SubscriberUtils.safeCancel(c);
            }
        }

        @Override
        void sendSuccess(@Nullable Object signal) {
            SubscriberUtils.safeOnComplete(this.original);
        }

        @Override
        void sendError(Throwable t) {
            SubscriberUtils.safeOnError(this.original, t);
        }
    }

    private static final class OffloadedSingleSubscriber<T>
    extends AbstractOffloadedSingleValueSubscriber
    implements SingleSource.Subscriber<T> {
        private final SingleSource.Subscriber<? super T> original;

        OffloadedSingleSubscriber(ThreadBasedSignalOffloader offloader, SingleSource.Subscriber<? super T> original) {
            super(offloader);
            this.original = original;
        }

        @Override
        public void onSuccess(@Nullable T t) {
            this.result(t);
        }

        @Override
        public void onError(Throwable t) {
            this.result(TerminalNotification.error(t));
        }

        @Override
        void sendOnSubscribe(Cancellable c) {
            try {
                this.original.onSubscribe(c);
            }
            catch (Throwable throwable) {
                this.sendError(throwable);
                SubscriberUtils.safeCancel(c);
            }
        }

        @Override
        void sendSuccess(@Nullable Object result) {
            SubscriberUtils.safeOnSuccess(this.original, ThreadBasedSignalOffloader.uncheckedCast(result));
        }

        @Override
        void sendError(Throwable t) {
            SubscriberUtils.safeOnError(this.original, t);
        }
    }

    private static abstract class AbstractOffloadedSingleValueSubscriber
    extends AbstractOffloadedEntity {
        private static final Object INITIAL_VALUE = new Object();
        private static final Object CANCELLED = new Object();
        private static final Cancellable CANCELLABLE_SENT = () -> {};
        private static final AtomicReferenceFieldUpdater<AbstractOffloadedSingleValueSubscriber, Object> resultUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractOffloadedSingleValueSubscriber.class, Object.class, "result");
        @Nullable
        private Cancellable cancellable;
        @Nullable
        private volatile Object result = INITIAL_VALUE;

        AbstractOffloadedSingleValueSubscriber(ThreadBasedSignalOffloader offloader) {
            super(offloader);
        }

        public void onSubscribe(Cancellable cancellable) {
            Objects.requireNonNull(cancellable);
            if (this.cancellable != null) {
                cancellable.cancel();
                return;
            }
            this.cancellable = () -> {
                if (resultUpdater.compareAndSet(this, INITIAL_VALUE, CANCELLED)) {
                    this.notifyExecutor();
                }
                cancellable.cancel();
            };
            this.notifyExecutor();
        }

        @Override
        public void sendSignals0() {
            Object result;
            Cancellable c = this.cancellable;
            if (c != null && c != CANCELLABLE_SENT) {
                this.cancellable = CANCELLABLE_SENT;
                this.sendOnSubscribe(c);
            }
            if ((result = this.result) == INITIAL_VALUE) {
                return;
            }
            this.setTerminated();
            if (result instanceof TerminalNotification) {
                Throwable cause = ((TerminalNotification)result).cause();
                assert (cause != null);
                this.sendError(cause);
            } else if (result != CANCELLED) {
                this.sendSuccess(result);
            }
        }

        abstract void sendOnSubscribe(Cancellable var1);

        abstract void sendSuccess(@Nullable Object var1);

        abstract void sendError(Throwable var1);

        final void result(@Nullable Object result) {
            this.result = result;
            this.notifyExecutor();
        }
    }

    private static final class OffloadedSubscriber<T>
    extends AbstractOffloadedEntity
    implements PublisherSource.Subscriber<T> {
        private static final Object NULL_ON_NEXT = new Object();
        private final ThreadBasedSignalOffloader offloader;
        private final PublisherSource.Subscriber<? super T> original;
        private final Queue<Object> signals;
        @Nullable
        private PublisherSource.Subscription subscription;
        private boolean cancelled;

        OffloadedSubscriber(ThreadBasedSignalOffloader offloader, PublisherSource.Subscriber<? super T> original) {
            super(offloader);
            this.offloader = offloader;
            this.original = original;
            this.signals = PlatformDependent.newUnboundedSpscQueue(offloader.publisherSignalQueueInitialCapacity);
        }

        @Override
        public void onSubscribe(final PublisherSource.Subscription s) {
            this.subscription = s;
            PublisherSource.Subscription cancelInterceptingSubscription = new PublisherSource.Subscription(){

                @Override
                public void request(long n) {
                    s.request(n);
                }

                @Override
                public void cancel() {
                    cancelled = true;
                    this.notifyExecutor();
                    s.cancel();
                }
            };
            this.offerSignal(cancelInterceptingSubscription);
        }

        @Override
        public void onNext(T t) {
            this.offerSignal(t == null ? NULL_ON_NEXT : t);
        }

        @Override
        public void onError(Throwable t) {
            this.offerSignal(TerminalNotification.error(t));
        }

        @Override
        public void onComplete() {
            this.offerSignal(TerminalNotification.complete());
        }

        @Override
        public void sendSignals0() {
            while (true) {
                Object signal;
                if ((signal = this.signals.poll()) == null) {
                    if (this.cancelled) {
                        this.setTerminated();
                    }
                    return;
                }
                if (signal instanceof PublisherSource.Subscription) {
                    PublisherSource.Subscription subscription = (PublisherSource.Subscription)signal;
                    try {
                        this.original.onSubscribe(subscription);
                    }
                    catch (Throwable throwable) {
                        this.setTerminated();
                        SubscriberUtils.safeOnError(this.original, throwable);
                        SubscriberUtils.safeCancel(subscription);
                    }
                    continue;
                }
                if (signal instanceof TerminalNotification) {
                    this.setTerminated();
                    TerminalNotification terminalNotification = (TerminalNotification)signal;
                    if (terminalNotification.cause() != null) {
                        SubscriberUtils.safeOnError(this.original, terminalNotification.cause());
                        continue;
                    }
                    SubscriberUtils.safeOnComplete(this.original);
                    continue;
                }
                try {
                    this.original.onNext(signal == NULL_ON_NEXT ? null : ThreadBasedSignalOffloader.uncheckedCast(signal));
                    continue;
                }
                catch (Throwable throwable) {
                    this.setTerminated();
                    SubscriberUtils.safeOnError(this.original, throwable);
                    assert (this.subscription != null);
                    SubscriberUtils.safeCancel(this.subscription);
                    continue;
                }
                break;
            }
        }

        private void offerSignal(Object signal) {
            if (!this.signals.offer(signal)) {
                throw new QueueFullException(this.offloader.executorThreadName() + "-" + this.original.getClass().getName());
            }
            this.notifyExecutor();
        }
    }

    private static abstract class AbstractOffloadedEntity
    implements OffloadedEntity {
        private static final AtomicIntegerFieldUpdater<AbstractOffloadedEntity> notifyUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractOffloadedEntity.class, "notify");
        private boolean terminated;
        private final ThreadBasedSignalOffloader offloader;
        private volatile int notify;

        AbstractOffloadedEntity(ThreadBasedSignalOffloader offloader) {
            this.offloader = offloader;
        }

        @Override
        public final void sendSignals() {
            if (notifyUpdater.compareAndSet(this, 1, 0)) {
                this.sendSignals0();
            }
        }

        @Override
        public final boolean isTerminated() {
            return this.terminated;
        }

        abstract void sendSignals0();

        final void notifyExecutor() {
            if (notifyUpdater.compareAndSet(this, 0, 1)) {
                this.offloader.notifyExecutor();
            }
        }

        final void setTerminated() {
            this.terminated = true;
        }
    }

    private static final class OffloadedSignalEntity<T>
    implements OffloadedEntity {
        private final Consumer<T> signalConsumer;
        private final T signal;
        private boolean terminated;

        OffloadedSignalEntity(Consumer<T> signalConsumer, T signal) {
            this.signalConsumer = signalConsumer;
            this.signal = signal;
        }

        @Override
        public void sendSignals() {
            this.terminated = true;
            try {
                this.signalConsumer.accept(this.signal);
            }
            catch (Throwable throwable) {
                LOGGER.error("Ignored unexpected exception offloading signal: {} to consumer: {}", this.signal, this.signalConsumer, throwable);
            }
        }

        @Override
        public boolean isTerminated() {
            return this.terminated;
        }
    }

    private static interface OffloadedEntity {
        public void sendSignals();

        public boolean isTerminated();
    }
}

