/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.utils;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SingleOperator;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber;
import io.servicetalk.http.api.StreamingHttpResponse;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nullable;

public final class BeforeFinallyHttpOperator
implements SingleOperator<StreamingHttpResponse, StreamingHttpResponse> {
    private final TerminalSignalConsumer beforeFinally;

    public BeforeFinallyHttpOperator(TerminalSignalConsumer beforeFinally) {
        this.beforeFinally = Objects.requireNonNull(beforeFinally);
    }

    public BeforeFinallyHttpOperator(Runnable beforeFinally) {
        this(TerminalSignalConsumer.from(beforeFinally));
    }

    @Override
    public SingleSource.Subscriber<? super StreamingHttpResponse> apply(SingleSource.Subscriber<? super StreamingHttpResponse> subscriber) {
        return new ResponseCompletionSubscriber(subscriber, this.beforeFinally);
    }

    private static final class ResponseCompletionSubscriber
    implements SingleSource.Subscriber<StreamingHttpResponse> {
        private static final int IDLE = 0;
        private static final int PROCESSING_PAYLOAD = 1;
        private static final int TERMINATED = 2;
        private static final AtomicIntegerFieldUpdater<ResponseCompletionSubscriber> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ResponseCompletionSubscriber.class, "state");
        private final SingleSource.Subscriber<? super StreamingHttpResponse> subscriber;
        private final TerminalSignalConsumer beforeFinally;
        private volatile int state;

        ResponseCompletionSubscriber(SingleSource.Subscriber<? super StreamingHttpResponse> sub, TerminalSignalConsumer beforeFinally) {
            this.subscriber = sub;
            this.beforeFinally = beforeFinally;
        }

        @Override
        public void onSubscribe(Cancellable cancellable) {
            this.subscriber.onSubscribe(() -> {
                try {
                    if (stateUpdater.compareAndSet(this, 0, 2)) {
                        this.beforeFinally.cancel();
                    }
                }
                finally {
                    cancellable.cancel();
                }
            });
        }

        @Override
        public void onSuccess(@Nullable StreamingHttpResponse response) {
            if (response == null) {
                this.sendNullResponse();
            } else if (stateUpdater.compareAndSet(this, 0, 1)) {
                this.subscriber.onSuccess(response.transformMessageBody(payload -> payload.liftSync(subscriber -> new PublisherSource.Subscriber<Object>(){

                    @Override
                    public void onSubscribe(final PublisherSource.Subscription subscription) {
                        subscriber.onSubscribe(new PublisherSource.Subscription(){

                            @Override
                            public void request(long n) {
                                subscription.request(n);
                            }

                            @Override
                            public void cancel() {
                                try {
                                    if (stateUpdater.compareAndSet(this, 1, 2)) {
                                        beforeFinally.cancel();
                                    }
                                }
                                finally {
                                    subscription.cancel();
                                }
                            }
                        });
                    }

                    @Override
                    public void onNext(@Nullable Object o) {
                        subscriber.onNext(o);
                    }

                    @Override
                    public void onError(Throwable t) {
                        try {
                            if (stateUpdater.compareAndSet(this, 1, 2)) {
                                beforeFinally.onError(t);
                            }
                        }
                        finally {
                            subscriber.onError(t);
                        }
                    }

                    @Override
                    public void onComplete() {
                        try {
                            if (stateUpdater.compareAndSet(this, 1, 2)) {
                                beforeFinally.onComplete();
                            }
                        }
                        finally {
                            subscriber.onComplete();
                        }
                    }
                })));
            } else {
                assert (this.state == 2);
                this.subscriber.onSuccess(response.transformMessageBody(payload -> {
                    SourceAdapters.toSource(payload).subscribe(CancelImmediatelySubscriber.INSTANCE);
                    return Publisher.failed(new CancellationException("Received response post cancel."));
                }));
            }
        }

        @Override
        public void onError(Throwable t) {
            try {
                if (stateUpdater.compareAndSet(this, 0, 2)) {
                    this.beforeFinally.onError(t);
                }
            }
            finally {
                this.subscriber.onError(t);
            }
        }

        private void sendNullResponse() {
            try {
                if (stateUpdater.compareAndSet(this, 0, 2)) {
                    this.beforeFinally.onComplete();
                }
            }
            catch (Throwable cause) {
                this.subscriber.onError(cause);
                return;
            }
            this.subscriber.onSuccess(null);
        }
    }
}

