/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.internal.TerminalNotification;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

public final class TestCollectingPublisherSubscriber<T>
implements PublisherSource.Subscriber<T> {
    private static final Object NULL_ON_NEXT = new Object();
    private final BlockingQueue<Object> items = new LinkedBlockingQueue<Object>();
    private final CountDownLatch onTerminalLatch = new CountDownLatch(1);
    private final CountDownLatch onSubscribeLatch = new CountDownLatch(1);
    @Nullable
    private TerminalNotification onTerminal;
    @Nullable
    private PublisherSource.Subscription subscription;

    public void onSubscribe(PublisherSource.Subscription subscription) {
        Objects.requireNonNull(subscription, "Null Subscription is not permitted https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13");
        this.verifyNoTerminal("onSubscribe", null, false);
        if (this.subscription != null) {
            throw new IllegalStateException("The Subscription has already been set to " + this.subscription + ". New Subscription " + subscription + " is not supported.");
        }
        this.subscription = subscription;
        this.onSubscribeLatch.countDown();
    }

    public void onNext(@Nullable T t) {
        this.verifyOnSubscribedAndNoTerminal("onNext", t, true);
        this.items.add(t == null ? NULL_ON_NEXT : t);
    }

    public void onError(Throwable t) {
        this.verifyOnSubscribedAndNoTerminal("onError", t, true);
        this.onTerminal = TerminalNotification.error((Throwable)t);
        this.onTerminalLatch.countDown();
    }

    public void onComplete() {
        this.verifyOnSubscribedAndNoTerminal("onComplete", null, false);
        this.onTerminal = TerminalNotification.complete();
        this.onTerminalLatch.countDown();
    }

    private void verifyNoTerminal(String method, @Nullable Object param, boolean useParam) {
        if (this.onTerminal != null) {
            throw new IllegalStateException("Subscriber has already terminated [" + this.onTerminal + "] " + method + (useParam ? " [ " + param + "]" : "") + " is not valid. See https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7");
        }
    }

    private void verifyOnSubscribedAndNoTerminal(String method, @Nullable Object param, boolean useParam) {
        this.verifyNoTerminal(method, param, useParam);
        if (this.subscription == null) {
            throw new IllegalStateException("onSubscribe must be called before any other signals. https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9");
        }
    }

    public PublisherSource.Subscription awaitSubscription() throws InterruptedException {
        this.onSubscribeLatch.await();
        assert (this.subscription != null);
        return this.subscription;
    }

    @Nullable
    public T takeOnNext() throws InterruptedException {
        Object item = this.items.take();
        return (T)(item == NULL_ON_NEXT ? null : item);
    }

    public List<T> pollAllOnNext() {
        ArrayList consumedItems = new ArrayList();
        this.items.drainTo(consumedItems);
        return consumedItems.stream().map(item -> item == NULL_ON_NEXT ? null : item).collect(Collectors.toList());
    }

    public Throwable awaitOnError() throws InterruptedException {
        return this.awaitOnError(true);
    }

    public Throwable awaitOnError(boolean verifyOnNextConsumed) throws InterruptedException {
        this.onTerminalLatch.await();
        assert (this.onTerminal != null);
        if (this.onTerminal == TerminalNotification.complete()) {
            throw new IllegalStateException("wanted onError but Subscriber terminated with onComplete");
        }
        assert (this.onTerminal.cause() != null);
        if (verifyOnNextConsumed) {
            this.verifyAllOnNextProcessed();
        }
        return this.onTerminal.cause();
    }

    public void awaitOnComplete() throws InterruptedException {
        this.awaitOnComplete(true);
    }

    public void awaitOnComplete(boolean verifyOnNextConsumed) throws InterruptedException {
        this.onTerminalLatch.await();
        assert (this.onTerminal != null);
        if (this.onTerminal != TerminalNotification.complete()) {
            throw new IllegalStateException("wanted onComplete but Subscriber terminated with onError", this.onTerminal.cause());
        }
        if (verifyOnNextConsumed) {
            this.verifyAllOnNextProcessed();
        }
    }

    public boolean pollTerminal(long timeout, TimeUnit unit) throws InterruptedException {
        return this.onTerminalLatch.await(timeout, unit);
    }

    private void verifyAllOnNextProcessed() {
        if (!this.items.isEmpty()) {
            Object item;
            StringBuilder b = new StringBuilder();
            int itemCount = 0;
            while ((item = this.items.poll()) != null) {
                ++itemCount;
                b.append("[").append((Object)(item == NULL_ON_NEXT ? null : item)).append("] ");
            }
            throw new IllegalStateException(itemCount + " onNext items were not processed: " + b.toString());
        }
    }
}

