/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.api.ClosableConcurrentStack;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.TerminalNotification;

final class CompletableProcessor
extends Completable
implements CompletableSource.Processor {
    private final ClosableConcurrentStack<CompletableSource.Subscriber> stack = new ClosableConcurrentStack();

    CompletableProcessor() {
    }

    @Override
    protected void handleSubscribe(CompletableSource.Subscriber subscriber) {
        DelayedCancellable delayedCancellable = new DelayedCancellable();
        subscriber.onSubscribe(delayedCancellable);
        if (this.stack.push(subscriber)) {
            delayedCancellable.delayedCancellable(() -> this.stack.relaxedRemove(subscriber));
        }
    }

    @Override
    public void onSubscribe(Cancellable cancellable) {
    }

    @Override
    public void onComplete() {
        this.terminate(TerminalNotification.complete());
    }

    @Override
    public void onError(Throwable t) {
        this.terminate(TerminalNotification.error(t));
    }

    private void terminate(TerminalNotification terminalSignal) {
        this.stack.close(terminalSignal::terminate);
    }

    @Override
    public void subscribe(CompletableSource.Subscriber subscriber) {
        this.subscribeInternal(subscriber);
    }
}

