/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.common.websocket;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.fluxcapacitor.common.Backlog;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.RetryConfiguration;
import io.fluxcapacitor.common.TimingUtils;
import io.fluxcapacitor.common.api.Command;
import io.fluxcapacitor.common.api.JsonType;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.QueryResult;
import io.fluxcapacitor.common.api.Request;
import io.fluxcapacitor.common.api.RequestBatch;
import io.fluxcapacitor.common.api.ResultBatch;
import io.fluxcapacitor.common.serialization.compression.CompressionAlgorithm;
import io.fluxcapacitor.common.serialization.compression.CompressionUtils;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.websocket.SessionPool;
import io.fluxcapacitor.javaclient.configuration.client.WebSocketClient;
import jakarta.websocket.CloseReason;
import jakarta.websocket.ContainerProvider;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.Session;
import jakarta.websocket.WebSocketContainer;
import java.beans.ConstructorProperties;
import java.io.OutputStream;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWebsocketClient
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(AbstractWebsocketClient.class);
    public static WebSocketContainer defaultWebSocketContainer = ContainerProvider.getWebSocketContainer();
    public static ObjectMapper defaultObjectMapper = ((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)JsonMapper.builder().disable(new DeserializationFeature[]{DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES})).findAndAddModules()).disable(new SerializationFeature[]{SerializationFeature.WRITE_DATES_AS_TIMESTAMPS})).build();
    private final SessionPool sessionPool;
    private final WebSocketClient.ClientConfig clientConfig;
    private final ObjectMapper objectMapper;
    private final Map<Long, WebSocketRequest> requests = new ConcurrentHashMap<Long, WebSocketRequest>();
    private final Map<String, Backlog<JsonType>> sessionBacklogs = new ConcurrentHashMap<String, Backlog<JsonType>>();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final ExecutorService resultExecutor = Executors.newFixedThreadPool(8, ObjectUtils.newThreadFactory((String)(this.getClass().getSimpleName() + "-onMessage")));
    private final boolean sendMetrics;

    public AbstractWebsocketClient(URI endpointUri, WebSocketClient.ClientConfig clientConfig, boolean sendMetrics) {
        this(endpointUri, clientConfig, sendMetrics, 1);
    }

    public AbstractWebsocketClient(URI endpointUri, WebSocketClient.ClientConfig clientConfig, boolean sendMetrics, int numberOfSessions) {
        this(defaultWebSocketContainer, endpointUri, clientConfig, sendMetrics, Duration.ofSeconds(1L), defaultObjectMapper, numberOfSessions);
    }

    public AbstractWebsocketClient(WebSocketContainer container, URI endpointUri, WebSocketClient.ClientConfig clientConfig, boolean sendMetrics, Duration reconnectDelay, ObjectMapper objectMapper, int numberOfSessions) {
        this.clientConfig = clientConfig;
        this.objectMapper = objectMapper;
        this.sendMetrics = sendMetrics;
        this.sessionPool = new SessionPool(numberOfSessions, () -> (Session)TimingUtils.retryOnFailure(() -> container.connectToServer((Object)this, endpointUri), (RetryConfiguration)RetryConfiguration.builder().delay(reconnectDelay).errorTest(e -> !this.closed.get()).successLogger(s -> log.info("Successfully reconnected to endpoint {}", (Object)endpointUri)).exceptionLogger(status -> {
            if (status.getNumberOfTimesRetried() == 0) {
                log.warn("Failed to connect to endpoint {}; reason: {}. Retrying every {} ms...", new Object[]{endpointUri, status.getException().getMessage(), status.getRetryConfiguration().getDelay().toMillis()});
            } else if (status.getNumberOfTimesRetried() % 100 == 0) {
                log.warn("Still trying to connect to endpoint {}. Last error: {}.", (Object)endpointUri, (Object)status.getException().getMessage());
            }
        }).build()));
    }

    protected <R extends QueryResult> CompletableFuture<R> send(Request request) {
        return new WebSocketRequest(request, FluxCapacitor.currentCorrelationData()).send();
    }

    protected <R extends QueryResult> R sendAndWait(Request request) {
        return (R)((QueryResult)this.send(request).get());
    }

    protected CompletableFuture<Void> sendCommand(Command command) {
        switch (command.getGuarantee()) {
            case NONE: {
                this.sendAndForget((JsonType)command);
                return CompletableFuture.completedFuture(null);
            }
            case SENT: {
                return this.sendAndForget((JsonType)command);
            }
        }
        return this.send((Request)command).thenApply(r -> null);
    }

    private CompletableFuture<Void> sendAndForget(JsonType object) {
        return this.send(object, this.sessionPool.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> send(JsonType object, Session session) {
        CompletableFuture completableFuture;
        try {
            completableFuture = this.sessionBacklogs.computeIfAbsent(session.getId(), id -> Backlog.forConsumer(batch -> this.sendBatch((List<JsonType>)batch, session))).add((Object[])new JsonType[]{object});
        }
        catch (Throwable throwable) {
            this.tryPublishMetrics(object, object instanceof Request ? this.metricsMetadata().with((Object)"requestId", (Object)((Request)object).getRequestId()) : this.metricsMetadata());
            throw throwable;
        }
        this.tryPublishMetrics(object, object instanceof Request ? this.metricsMetadata().with((Object)"requestId", (Object)((Request)object).getRequestId()) : this.metricsMetadata());
        return completableFuture;
    }

    private void sendBatch(List<JsonType> requests, Session session) {
        RequestBatch object = requests.size() == 1 ? requests.get(0) : new RequestBatch(requests);
        try (OutputStream outputStream = session.getBasicRemote().getSendStream();){
            byte[] bytes = this.objectMapper.writeValueAsBytes((Object)object);
            outputStream.write(CompressionUtils.compress((byte[])bytes, (CompressionAlgorithm)this.clientConfig.getCompression()));
        }
        catch (Exception e) {
            log.error("Failed to send request {}", (Object)object, (Object)e);
            throw e;
        }
    }

    @OnMessage
    public void onMessage(byte[] bytes) {
        this.resultExecutor.execute(() -> {
            JsonType value;
            try {
                value = (JsonType)this.objectMapper.readValue(CompressionUtils.decompress((byte[])bytes, (CompressionAlgorithm)this.clientConfig.getCompression()), JsonType.class);
            }
            catch (Exception e) {
                log.error("Could not parse input. Expected a Json message.", (Throwable)e);
                return;
            }
            if (value instanceof ResultBatch) {
                String batchId = FluxCapacitor.generateId();
                ((ResultBatch)value).getResults().forEach(r -> this.resultExecutor.execute(() -> this.handleResult((QueryResult)r, batchId)));
            } else {
                this.handleResult((QueryResult)value, null);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleResult(QueryResult result, String batchId) {
        block6: {
            try {
                WebSocketRequest webSocketRequest = this.requests.remove(result.getRequestId());
                if (webSocketRequest == null) {
                    log.warn("Could not find outstanding read request for id {}", (Object)result.getRequestId());
                    break block6;
                }
                try {
                    Metadata metadata = this.metricsMetadata().with(new Object[]{"requestId", webSocketRequest.request.getRequestId(), "msDuration", System.currentTimeMillis() - webSocketRequest.sendTimestamp}).with(webSocketRequest.correlationData);
                    this.tryPublishMetrics((JsonType)result, batchId == null ? metadata : metadata.with((Object)"batchId", (Object)batchId));
                }
                finally {
                    webSocketRequest.result.complete(result);
                }
            }
            catch (Throwable e) {
                log.error("Failed to handle result {}", (Object)result, (Object)e);
            }
        }
    }

    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        this.sessionBacklogs.remove(session.getId());
        if (closeReason.getCloseCode().getCode() > CloseReason.CloseCodes.NO_STATUS_CODE.getCode()) {
            log.warn("Connection to endpoint {} closed with reason {}", (Object)session.getRequestURI(), (Object)closeReason);
        }
        this.retryOutstandingRequests(session.getId());
    }

    protected void retryOutstandingRequests(String sessionId) {
        if (!this.closed.get() && !this.requests.isEmpty()) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Thread interrupted while trying to retry outstanding requests", e);
            }
            this.requests.values().stream().filter(r -> sessionId.equals(r.sessionId)).forEach(WebSocketRequest::send);
        }
    }

    @OnError
    public void onError(Session session, Throwable e) {
        log.error("Client side error for web socket connected to endpoint {}", (Object)session.getRequestURI(), (Object)e);
    }

    @Override
    public void close() {
        this.close(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void close(boolean clearOutstandingRequests) {
        if (this.closed.compareAndSet(false, true)) {
            AtomicBoolean atomicBoolean = this.closed;
            synchronized (atomicBoolean) {
                if (clearOutstandingRequests) {
                    this.requests.clear();
                }
                this.sessionPool.close();
                if (!this.requests.isEmpty()) {
                    log.warn("{}: Closed websocket session to endpoint with {} outstanding requests", (Object)this.getClass().getSimpleName(), (Object)this.requests.size());
                }
            }
        }
    }

    protected void tryPublishMetrics(JsonType message, Metadata metadata) {
        Object metric = message.toMetric();
        if (this.sendMetrics && metric != null) {
            FluxCapacitor.getOptionally().ifPresent(f -> FluxCapacitor.publishMetrics(metric, metadata));
        }
    }

    protected Metadata metricsMetadata() {
        return Metadata.empty();
    }

    protected class WebSocketRequest {
        private final Request request;
        private final CompletableFuture<QueryResult> result = new CompletableFuture();
        private final Map<String, String> correlationData;
        private volatile String sessionId;
        private volatile long sendTimestamp;

        protected <T extends QueryResult> CompletableFuture<T> send() {
            Session session;
            try {
                session = AbstractWebsocketClient.this.sessionPool.get();
            }
            catch (Exception e) {
                log.error("Failed to get websocket session to send request {}", (Object)this.request, (Object)e);
                this.result.completeExceptionally(e);
                return this.result;
            }
            this.sessionId = session.getId();
            AbstractWebsocketClient.this.requests.put(this.request.getRequestId(), this);
            try {
                this.sendTimestamp = System.currentTimeMillis();
                AbstractWebsocketClient.this.send((JsonType)this.request, session);
            }
            catch (Exception e) {
                AbstractWebsocketClient.this.requests.remove(this.request.getRequestId());
                this.result.completeExceptionally(e);
            }
            return this.result;
        }

        @ConstructorProperties(value={"request", "correlationData"})
        public WebSocketRequest(Request request, Map<String, String> correlationData) {
            this.request = request;
            this.correlationData = correlationData;
        }
    }
}

