/*
 * Decompiled with CFR 0.152.
 */
package io.aleph0.yap.messaging.jetty.worker;

import io.aleph0.yap.core.Sink;
import io.aleph0.yap.messaging.core.FirehoseMetrics;
import io.aleph0.yap.messaging.core.Message;
import io.aleph0.yap.messaging.core.worker.FirehoseProducerWorker;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketFirehoseProducerWorker<T>
implements FirehoseProducerWorker<Message<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketFirehoseProducerWorker.class);
    private final AtomicLong receivedMetric = new AtomicLong(0L);
    private final URI uri;
    private final Configurator configurator;
    private final MessageFactory<T> messageFactory;

    public static Configurator defaultConfigurator() {
        return new Configurator(){};
    }

    public WebSocketFirehoseProducerWorker(URI uri, MessageFactory<T> messageFactory) {
        this(uri, WebSocketFirehoseProducerWorker.defaultConfigurator(), messageFactory);
    }

    public WebSocketFirehoseProducerWorker(URI uri, Configurator configurator, MessageFactory<T> messageFactory) {
        this.uri = Objects.requireNonNull(uri, "uri");
        this.configurator = Objects.requireNonNull(configurator, "configurator");
        this.messageFactory = Objects.requireNonNull(messageFactory, "messageFactory");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void produce(Sink<Message<T>> sink) throws IOException, InterruptedException {
        try {
            Throwable failureCause;
            ArrayBlockingQueue<Throwable> failureCauses = new ArrayBlockingQueue<Throwable>(1);
            HttpClient http = new HttpClient();
            this.configurator.configureHttpClient(http);
            WebSocketClient websocket = new WebSocketClient(http);
            websocket.setStopTimeout(5000L);
            websocket.setIdleTimeout(Duration.ofSeconds(30L));
            this.configurator.configureWebSocketClient(websocket);
            try {
                websocket.start();
            }
            catch (Exception e) {
                LOGGER.atError().setCause((Throwable)e).log("Failed to start WebSocket client");
                throw new ExecutionException("Failed to start WebSocket client", e);
            }
            try {
                ClientUpgradeRequest request = new ClientUpgradeRequest();
                this.configurator.configureClientUpgradeRequest(request);
                CountDownLatch latch = new CountDownLatch(1);
                websocket.connect((Object)new InternalSocketListener(sink, failureCauses, latch), this.uri, request);
                latch.await();
                failureCause = (Throwable)failureCauses.take();
            }
            finally {
                try {
                    websocket.stop();
                }
                catch (Exception e) {
                    LOGGER.atError().setCause((Throwable)e).log("Failed to stop WebSocket client");
                    throw new ExecutionException("Failed to stop WebSocket client", e);
                }
            }
            if (!(failureCause instanceof NormalCloseException)) {
                Throwable x;
                if (failureCause instanceof Error) {
                    x = (Error)failureCause;
                    throw x;
                }
                if (failureCause instanceof InterruptedException) {
                    throw new InterruptedException();
                }
                if (failureCause instanceof Exception) {
                    x = (Exception)failureCause;
                    throw new ExecutionException("Websocket failed", x);
                }
                throw new AssertionError("Unexpected error", failureCause);
            }
            LOGGER.atInfo().log("Websocket session closed normally");
        }
        catch (InterruptedException e) {
            LOGGER.atError().setCause((Throwable)e).log("Websocket session interrupted. Propagating...");
            Thread.currentThread().interrupt();
            throw new InterruptedException();
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            LOGGER.atError().setCause(cause).log("Websocket session failed. Failing task...");
            if (cause instanceof IOException) {
                throw (IOException)cause;
            }
            if (cause instanceof RuntimeException) {
                throw (RuntimeException)cause;
            }
            if (cause instanceof Error) {
                throw (Error)cause;
            }
            throw new IOException("Jetstream session failed", cause);
        }
    }

    public FirehoseMetrics checkMetrics() {
        long received = this.receivedMetric.get();
        return new FirehoseMetrics(received);
    }

    public FirehoseMetrics flushMetrics() {
        FirehoseMetrics result = this.checkMetrics();
        this.receivedMetric.set(0L);
        return result;
    }

    public static interface Configurator {
        default public void configureHttpClient(HttpClient client) {
        }

        default public void configureWebSocketClient(WebSocketClient client) {
        }

        default public void configureClientUpgradeRequest(ClientUpgradeRequest request) {
        }

        default public void configureSession(Session session) {
        }
    }

    public static interface MessageFactory<T> {
        public List<Message<T>> newTextMessages(String var1);

        public List<Message<T>> newBinaryMessages(ByteBuffer var1);
    }

    @WebSocket(autoDemand=true)
    public class InternalSocketListener {
        private final Sink<Message<T>> sink;
        private final BlockingQueue<Throwable> failureCauses;
        private final CountDownLatch latch;

        public InternalSocketListener(Sink<Message<T>> sink, BlockingQueue<Throwable> failureCauses, CountDownLatch latch) {
            this.sink = Objects.requireNonNull(sink, "sink");
            this.failureCauses = Objects.requireNonNull(failureCauses, "failureCauses");
            this.latch = Objects.requireNonNull(latch, "latch");
        }

        @OnWebSocketOpen
        public void onWebSocketOpen(Session session) {
            LOGGER.atInfo().log("WebSocket connected");
            WebSocketFirehoseProducerWorker.this.configurator.configureSession(session);
        }

        @OnWebSocketMessage
        public void onWebSocketText(Session session, String text) {
            List messages;
            try {
                messages = WebSocketFirehoseProducerWorker.this.messageFactory.newTextMessages(text);
            }
            catch (Exception e) {
                LOGGER.atError().setCause((Throwable)e).log("Failed to create text messages");
                this.failureCauses.offer(e);
                session.close(1011, null, Callback.NOOP);
                return;
            }
            this.putMessages(session, messages);
        }

        @OnWebSocketMessage
        public void onWebSocketBinary(Session session, ByteBuffer payload, Callback callback) {
            List messages;
            try {
                messages = WebSocketFirehoseProducerWorker.this.messageFactory.newBinaryMessages(payload);
            }
            catch (Exception e) {
                LOGGER.atError().setCause((Throwable)e).log("Failed to create binary messages");
                this.failureCauses.offer(e);
                session.close(1011, null, Callback.NOOP);
                return;
            }
            this.putMessages(session, messages);
            callback.succeed();
        }

        private void putMessages(Session session, List<Message<T>> messages) {
            try {
                for (Message message : messages) {
                    this.sink.put(message);
                    WebSocketFirehoseProducerWorker.this.receivedMetric.incrementAndGet();
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOGGER.atInfo().setCause((Throwable)e).log("Interrupted while putting messages");
                this.failureCauses.offer(e);
                session.close(1000, null, Callback.NOOP);
            }
        }

        @OnWebSocketError
        public void onWebSocketError(Session session, Throwable cause) {
            LOGGER.atError().setCause(cause).log("WebSocket error");
            this.failureCauses.offer(cause);
            this.latch.countDown();
            session.disconnect();
        }

        @OnWebSocketClose
        public void onWebSocketClose(Session session, int statusCode, String reason) {
            LOGGER.atInfo().addKeyValue("statusCode", (Object)statusCode).addKeyValue("reason", (Object)reason).log("WebSocket closed");
            IOException failureCause = statusCode == 1000 ? new NormalCloseException() : new IOException("WebSocket closed with status code " + statusCode);
            this.failureCauses.offer(failureCause);
            this.latch.countDown();
        }
    }

    private static class NormalCloseException
    extends IOException {
        public NormalCloseException() {
            super("closed");
        }
    }
}

