/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.router.jersey;

import io.servicetalk.buffer.netty.BufferAllocators;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ExecutorExtension;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.http.api.DefaultHttpHeadersFactory;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpHeaderValues;
import io.servicetalk.http.api.HttpProtocolVersion;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.router.jersey.ExecutionStrategyTest;
import io.servicetalk.http.router.jersey.HttpJerseyRouterBuilder;
import io.servicetalk.http.router.jersey.TestUtils;
import io.servicetalk.http.router.jersey.resources.CancellableResources;
import io.servicetalk.transport.api.IoExecutor;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.ws.rs.core.Application;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(value={MockitoExtension.class})
@MockitoSettings(strictness=Strictness.LENIENT)
class CancellationTest {
    private static final StreamingHttpRequestResponseFactory HTTP_REQ_RES_FACTORY = new DefaultStreamingHttpRequestResponseFactory(BufferAllocators.DEFAULT_ALLOCATOR, DefaultHttpHeadersFactory.INSTANCE, HttpProtocolVersion.HTTP_1_1);
    private static final CharSequence TEST_DATA = TestUtils.newLargePayload();
    @RegisterExtension
    static final ExecutorExtension<Executor> execRule = ExecutorExtension.withCachedExecutor().setClassLevel(true);
    @Mock
    private HttpServiceContext ctx;
    @Mock
    private Executor execMock;
    private CancellableResources cancellableResources;
    private StreamingHttpService jerseyRouter;

    CancellationTest() {
    }

    @BeforeEach
    void setup() {
        HttpExecutionContext execCtx = (HttpExecutionContext)Mockito.mock(HttpExecutionContext.class);
        Mockito.when((Object)this.ctx.executionContext()).thenReturn((Object)execCtx);
        Mockito.when((Object)this.ctx.localAddress()).thenReturn((Object)InetSocketAddress.createUnresolved("localhost", 8080));
        Mockito.when((Object)execCtx.bufferAllocator()).thenReturn((Object)BufferAllocators.DEFAULT_ALLOCATOR);
        Mockito.when((Object)execCtx.executor()).thenReturn((Object)this.execMock);
        Mockito.when((Object)execCtx.ioExecutor()).thenReturn(Mockito.mock(IoExecutor.class));
        Mockito.when((Object)execCtx.executionStrategy()).thenReturn((Object)HttpExecutionStrategies.defaultStrategy());
        this.cancellableResources = new CancellableResources();
        this.jerseyRouter = new HttpJerseyRouterBuilder().routeExecutionStrategyFactory(ExecutionStrategyTest.asFactory(Collections.singletonMap("test", HttpExecutionStrategies.defaultStrategy()))).buildStreaming(new Application(){

            public Set<Object> getSingletons() {
                return Collections.singleton(CancellationTest.this.cancellableResources);
            }
        });
    }

    @Test
    void cancelSuspended() throws Exception {
        TestCancellable cancellable = new TestCancellable();
        Mockito.when((Object)this.execMock.schedule((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.eq((long)7L), (TimeUnit)((Object)ArgumentMatchers.eq((Object)((Object)TimeUnit.DAYS))))).thenReturn((Object)cancellable);
        this.testCancelResponseSingle(CancellationTest.get("/suspended"), false);
        MatcherAssert.assertThat((Object)cancellable.cancelled, (Matcher)Matchers.is((Object)true));
    }

    @Test
    void cancelSingle() throws Exception {
        this.testCancelResponseSingle(CancellationTest.get("/single"));
    }

    @Test
    void cancelOffload() throws Exception {
        this.testCancelResponseSingle(CancellationTest.get("/offload"));
    }

    @Test
    void cancelOioStreams() throws Exception {
        this.testCancelResponsePayload(CancellationTest.post("/oio-streams"));
        this.testCancelResponseSingle(CancellationTest.post("/offload-oio-streams"));
        this.testCancelResponseSingle(CancellationTest.post("/no-offloads-oio-streams"));
    }

    @Test
    void cancelRsStreams() throws Exception {
        this.testCancelResponsePayload(CancellationTest.post("/rs-streams"));
        this.testCancelResponsePayload(CancellationTest.post("/rs-streams?subscribe=true"));
        this.testCancelResponseSingle(CancellationTest.post("/offload-rs-streams"));
        this.testCancelResponseSingle(CancellationTest.post("/offload-rs-streams?subscribe=true"));
        this.testCancelResponseSingle(CancellationTest.post("/no-offloads-rs-streams"));
        this.testCancelResponseSingle(CancellationTest.post("/no-offloads-rs-streams?subscribe=true"));
    }

    @Test
    void cancelSse() throws Exception {
        ((Executor)Mockito.doAnswer(invocation -> {
            Object[] args = invocation.getArguments();
            return execRule.executor().schedule((Runnable)args[0], ((Long)args[1]).longValue(), (TimeUnit)((Object)((Object)args[2])));
        }).when((Object)this.execMock)).schedule((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        this.testCancelResponsePayload(CancellationTest.get("/sse"));
        this.cancellableResources.sseSinkClosedLatch.await();
    }

    private void testCancelResponsePayload(StreamingHttpRequest req) throws Exception {
        Single respSingle = execRule.executor().submit(() -> this.jerseyRouter.handle(this.ctx, req, (StreamingHttpResponseFactory)HTTP_REQ_RES_FACTORY)).flatMap(Function.identity());
        Future respFuture = respSingle.toFuture();
        StreamingHttpResponse res = (StreamingHttpResponse)respFuture.get();
        respFuture.cancel(true);
        MatcherAssert.assertThat((Object)res.status(), (Matcher)Matchers.is((Object)HttpResponseStatus.OK));
        AtomicReference errorRef = new AtomicReference();
        CountDownLatch cancelledLatch = new CountDownLatch(1);
        res.payloadBody().beforeOnError(errorRef::set).beforeCancel(cancelledLatch::countDown).ignoreElements().subscribe().cancel();
        cancelledLatch.await();
        Throwable error = (Throwable)errorRef.get();
        if (error != null) {
            throw new AssertionError((Object)error);
        }
    }

    private void testCancelResponseSingle(StreamingHttpRequest req) throws Exception {
        this.testCancelResponseSingle(req, true);
    }

    private void testCancelResponseSingle(StreamingHttpRequest req, boolean enableOffload) throws Exception {
        final AtomicReference errorRef = new AtomicReference();
        final CountDownLatch cancelledLatch = new CountDownLatch(1);
        if (enableOffload) {
            Single respSingle = execRule.executor().submit(() -> this.jerseyRouter.handle(this.ctx, req, (StreamingHttpResponseFactory)HTTP_REQ_RES_FACTORY)).flatMap(Function.identity()).beforeOnError(err -> {
                if (!(err instanceof IllegalStateException) && !(err instanceof InterruptedException)) {
                    errorRef.compareAndSet(null, err);
                }
            }).afterCancel(cancelledLatch::countDown);
            SourceAdapters.toSource((Single)respSingle).subscribe((SingleSource.Subscriber)new SingleSource.Subscriber<StreamingHttpResponse>(){

                public void onSubscribe(Cancellable cancellable) {
                    cancellable.cancel();
                }

                public void onSuccess(@Nullable StreamingHttpResponse result) {
                    if (result == null) {
                        errorRef.compareAndSet(null, new NullPointerException("result == null not expected."));
                        cancelledLatch.countDown();
                    } else {
                        result.messageBody().ignoreElements().afterFinally(cancelledLatch::countDown).subscribe();
                    }
                }

                public void onError(Throwable t) {
                    if (!(t instanceof IllegalStateException) && !(t instanceof InterruptedException)) {
                        errorRef.compareAndSet(null, t);
                    }
                    cancelledLatch.countDown();
                }
            });
        } else {
            this.jerseyRouter.handle(this.ctx, req, (StreamingHttpResponseFactory)HTTP_REQ_RES_FACTORY).beforeOnError(errorRef::set).afterCancel(cancelledLatch::countDown).ignoreElement().subscribe().cancel();
        }
        cancelledLatch.await();
        Throwable error = (Throwable)errorRef.get();
        if (error != null) {
            throw new AssertionError((Object)error);
        }
    }

    private static StreamingHttpRequest get(String resourcePath) {
        return HTTP_REQ_RES_FACTORY.get("/cancel" + resourcePath);
    }

    private static StreamingHttpRequest post(String resourcePath) {
        StreamingHttpRequest req = HTTP_REQ_RES_FACTORY.post("/cancel" + resourcePath).payloadBody(Publisher.from((Object)BufferAllocators.DEFAULT_ALLOCATOR.fromAscii(TEST_DATA)));
        req.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN);
        return req;
    }

    private static class TestCancellable
    implements Cancellable {
        private boolean cancelled;

        private TestCancellable() {
        }

        public void cancel() {
            this.cancelled = true;
        }
    }
}

