package io.fluxcapacitor.testserver.websocket;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.fluxcapacitor.common.Backlog;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.BooleanResult;
import io.fluxcapacitor.common.api.ClientEvent;
import io.fluxcapacitor.common.api.Command;
import io.fluxcapacitor.common.api.ConnectEvent;
import io.fluxcapacitor.common.api.DisconnectEvent;
import io.fluxcapacitor.common.api.JsonType;
import io.fluxcapacitor.common.api.QueryResult;
import io.fluxcapacitor.common.api.Request;
import io.fluxcapacitor.common.api.RequestBatch;
import io.fluxcapacitor.common.api.ResultBatch;
import io.fluxcapacitor.common.api.StringResult;
import io.fluxcapacitor.common.api.VoidResult;
import io.fluxcapacitor.common.handling.Handler;
import io.fluxcapacitor.common.handling.HandlerInspector;
import io.fluxcapacitor.common.handling.ParameterResolver;
import io.fluxcapacitor.common.serialization.NullCollectionsAsEmptyModule;
import io.fluxcapacitor.common.serialization.compression.CompressionAlgorithm;
import io.fluxcapacitor.common.serialization.compression.CompressionUtils;
import io.fluxcapacitor.testserver.metrics.MetricsLog;
import io.fluxcapacitor.testserver.metrics.NoOpMetricsLog;
import io.undertow.util.SameThreadExecutor;
import jakarta.annotation.Nullable;
import jakarta.websocket.CloseReason;
import jakarta.websocket.Endpoint;
import jakarta.websocket.EndpointConfig;
import jakarta.websocket.Session;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Parameter;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.hibernate.validator.internal.metadata.core.ConstraintHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/testserver/websocket/WebsocketEndpoint.class */
public abstract class WebsocketEndpoint extends Endpoint {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WebsocketEndpoint.class);
    private static final ObjectMapper defaultObjectMapper = JsonMapper.builder().findAndAddModules().disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS).disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).disable(SerializationFeature.FAIL_ON_EMPTY_BEANS).addModule(new NullCollectionsAsEmptyModule()).enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY).build();
    MetricsLog metricsLog;
    private final ObjectMapper objectMapper;
    private final Executor requestExecutor;
    private final Map<String, SessionBacklog> sessionBacklogs;
    protected final AtomicBoolean shuttingDown;
    protected volatile boolean shutDown;
    private final Handler<ClientMessage> handler;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/fluxcapacitor/testserver/websocket/WebsocketEndpoint$ClientMessage.class */
    public static final class ClientMessage {
        private final JsonType payload;
        private final Session session;

        @ConstructorProperties({ConstraintHelper.PAYLOAD, "session"})
        public ClientMessage(JsonType jsonType, Session session) {
            this.payload = jsonType;
            this.session = session;
        }

        public JsonType getPayload() {
            return this.payload;
        }

        public Session getSession() {
            return this.session;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ClientMessage)) {
                return false;
            }
            ClientMessage clientMessage = (ClientMessage) obj;
            JsonType payload = getPayload();
            JsonType payload2 = clientMessage.getPayload();
            if (payload == null) {
                if (payload2 != null) {
                    return false;
                }
            } else if (!payload.equals(payload2)) {
                return false;
            }
            Session session = getSession();
            Session session2 = clientMessage.getSession();
            return session == null ? session2 == null : session.equals(session2);
        }

        public int hashCode() {
            JsonType payload = getPayload();
            int hashCode = (1 * 59) + (payload == null ? 43 : payload.hashCode());
            Session session = getSession();
            return (hashCode * 59) + (session == null ? 43 : session.hashCode());
        }

        public String toString() {
            return "WebsocketEndpoint.ClientMessage(payload=" + String.valueOf(getPayload()) + ", session=" + String.valueOf(getSession()) + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/fluxcapacitor/testserver/websocket/WebsocketEndpoint$SessionBacklog.class */
    public static final class SessionBacklog {
        private final Backlog<QueryResult> delegate;
        private final Session session;

        @ConstructorProperties({"delegate", "session"})
        public SessionBacklog(Backlog<QueryResult> backlog, Session session) {
            this.delegate = backlog;
            this.session = session;
        }

        public Backlog<QueryResult> getDelegate() {
            return this.delegate;
        }

        public Session getSession() {
            return this.session;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof SessionBacklog)) {
                return false;
            }
            SessionBacklog sessionBacklog = (SessionBacklog) obj;
            Backlog<QueryResult> delegate = getDelegate();
            Backlog<QueryResult> delegate2 = sessionBacklog.getDelegate();
            if (delegate == null) {
                if (delegate2 != null) {
                    return false;
                }
            } else if (!delegate.equals(delegate2)) {
                return false;
            }
            Session session = getSession();
            Session session2 = sessionBacklog.getSession();
            return session == null ? session2 == null : session.equals(session2);
        }

        public int hashCode() {
            Backlog<QueryResult> delegate = getDelegate();
            int hashCode = (1 * 59) + (delegate == null ? 43 : delegate.hashCode());
            Session session = getSession();
            return (hashCode * 59) + (session == null ? 43 : session.hashCode());
        }

        public String toString() {
            return "WebsocketEndpoint.SessionBacklog(delegate=" + String.valueOf(getDelegate()) + ", session=" + String.valueOf(getSession()) + ")";
        }

        public CompletableFuture<Void> add(QueryResult... queryResultArr) {
            return getDelegate().add(queryResultArr);
        }

        public CompletableFuture<Void> add(Collection<? extends QueryResult> collection) {
            return getDelegate().add(collection);
        }

        public Registration registerMonitor(Consumer<List<QueryResult>> consumer) {
            return getDelegate().registerMonitor(consumer);
        }

        public void shutDown() {
            getDelegate().shutDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public WebsocketEndpoint() {
        this.metricsLog = new NoOpMetricsLog();
        this.sessionBacklogs = new ConcurrentHashMap();
        this.shuttingDown = new AtomicBoolean();
        this.handler = HandlerInspector.createHandler(this, (Class<? extends Annotation>) Handle.class, Arrays.asList(new ParameterResolver<ClientMessage>(this) { // from class: io.fluxcapacitor.testserver.websocket.WebsocketEndpoint.1
            @Override // io.fluxcapacitor.common.handling.ParameterResolver
            /* renamed from: resolve */
            public Function<ClientMessage, Object> resolve2(Parameter parameter, Annotation annotation) {
                if (Objects.equals(parameter.getDeclaringExecutable().getParameters()[0], parameter)) {
                    return (v0) -> {
                        return v0.getPayload();
                    };
                }
                return null;
            }

            @Override // io.fluxcapacitor.common.handling.ParameterResolver
            public boolean determinesSpecificity() {
                return true;
            }
        }, (parameter, annotation) -> {
            if (parameter.getType().equals(Session.class)) {
                return (v0) -> {
                    return v0.getSession();
                };
            }
            return null;
        }));
        this.objectMapper = defaultObjectMapper;
        this.requestExecutor = Executors.newFixedThreadPool(64, ObjectUtils.newThreadFactory(getClass().getSimpleName()));
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown, ObjectUtils.newThreadName(getClass().getSimpleName() + "-shutdown")));
    }

    protected WebsocketEndpoint(@Nullable Executor executor) {
        this.metricsLog = new NoOpMetricsLog();
        this.sessionBacklogs = new ConcurrentHashMap();
        this.shuttingDown = new AtomicBoolean();
        this.handler = HandlerInspector.createHandler(this, (Class<? extends Annotation>) Handle.class, Arrays.asList(new ParameterResolver<ClientMessage>(this) { // from class: io.fluxcapacitor.testserver.websocket.WebsocketEndpoint.1
            @Override // io.fluxcapacitor.common.handling.ParameterResolver
            /* renamed from: resolve */
            public Function<ClientMessage, Object> resolve2(Parameter parameter, Annotation annotation) {
                if (Objects.equals(parameter.getDeclaringExecutable().getParameters()[0], parameter)) {
                    return (v0) -> {
                        return v0.getPayload();
                    };
                }
                return null;
            }

            @Override // io.fluxcapacitor.common.handling.ParameterResolver
            public boolean determinesSpecificity() {
                return true;
            }
        }, (parameter, annotation) -> {
            if (parameter.getType().equals(Session.class)) {
                return (v0) -> {
                    return v0.getSession();
                };
            }
            return null;
        }));
        this.objectMapper = defaultObjectMapper;
        this.requestExecutor = (Executor) Optional.ofNullable(executor).orElse(SameThreadExecutor.INSTANCE);
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown, ObjectUtils.newThreadName(getClass().getSimpleName() + "-shutdown")));
    }

    @Override // jakarta.websocket.Endpoint
    public void onOpen(Session session, EndpointConfig endpointConfig) {
        if (this.shuttingDown.get()) {
            throw new IllegalStateException("Cannot accept client. Endpoint is shutting down");
        }
        this.sessionBacklogs.put(session.getId(), new SessionBacklog(Backlog.forConsumer(list -> {
            sendResultBatch(session, list);
        }), session));
        session.addMessageHandler(byte[].class, bArr -> {
            this.requestExecutor.execute(() -> {
                try {
                    JsonType deserializeRequest = deserializeRequest(session, bArr);
                    if (this.shutDown) {
                        throw new IllegalStateException(String.format("Rejecting request %s from client %s with id %s because the service is shutting down", deserializeRequest, getClientName(session), getClientId(session)));
                    }
                    if (this.shuttingDown.get()) {
                        log.info("Silently ignoring request {} from client {} with id {} because the service is shutting down", deserializeRequest, getClientName(session), getClientId(session));
                    } else {
                        handleMessage(session, deserializeRequest);
                    }
                } catch (Throwable th) {
                    log.error("Failed to handle request", th);
                }
            });
        });
        registerMetrics(new ConnectEvent(getClientName(session), getClientId(session), session.getId(), toString()));
    }

    protected JsonType deserializeRequest(Session session, byte[] bArr) {
        return (JsonType) this.objectMapper.readValue(CompressionUtils.decompress(bArr, getCompressionAlgorithm(session)), JsonType.class);
    }

    protected void handleMessage(Session session, JsonType jsonType) {
        if (jsonType instanceof RequestBatch) {
            Stream<Runnable> createTasks = createTasks((RequestBatch) jsonType, session);
            Executor executor = this.requestExecutor;
            Objects.requireNonNull(executor);
            createTasks.forEach(executor::execute);
            return;
        }
        try {
            trySendResult(session, jsonType, this.handler.getInvoker(new ClientMessage(jsonType, session)).orElseThrow().invoke());
        } catch (Throwable th) {
            log.error("Could not handle request {}", jsonType, th);
        }
    }

    private void trySendResult(Session session, JsonType jsonType, Object obj) {
        if (jsonType instanceof Request) {
            Request request = (Request) jsonType;
            if (!(request instanceof Command) || ((Command) request).getGuarantee().compareTo(Guarantee.STORED) >= 0) {
                if (obj instanceof QueryResult) {
                    doSendResult(session, (QueryResult) obj);
                    return;
                }
                if (obj == null) {
                    if (request instanceof Command) {
                        doSendResult(session, new VoidResult(request.getRequestId()));
                    }
                } else {
                    if (obj instanceof Boolean) {
                        doSendResult(session, new BooleanResult(request.getRequestId(), ((Boolean) obj).booleanValue()));
                        return;
                    }
                    if (obj instanceof String) {
                        doSendResult(session, new StringResult(request.getRequestId(), (String) obj));
                    } else if (obj instanceof CompletableFuture) {
                        ((CompletableFuture) obj).whenComplete((obj2, th) -> {
                            if (th != null) {
                                log.error("Request {} failed. Not sending back result to client.", jsonType, th);
                            } else {
                                trySendResult(session, jsonType, obj2);
                            }
                        });
                    } else {
                        log.warn("Not able to send back result of type {} to client. Contents: {}. Request: {}", obj.getClass(), obj, request);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doSendResult(Session session, QueryResult queryResult) {
        Optional.ofNullable(this.sessionBacklogs.get(session.getId())).or(() -> {
            return findAlternativeBacklog(session);
        }).ifPresentOrElse(sessionBacklog -> {
            sessionBacklog.add(queryResult);
        }, () -> {
            log.info("Not sending result {}. Could not find any suitable sessions for client {}.", queryResult, getClientId(session));
        });
    }

    protected Stream<Runnable> createTasks(RequestBatch<?> requestBatch, Session session) {
        return requestBatch.getRequests().stream().map(jsonType -> {
            return () -> {
                handleMessage(session, jsonType);
            };
        });
    }

    protected void sendResultBatch(Session session, List<QueryResult> list) {
        try {
            Object resultBatch = list.size() == 1 ? list.get(0) : new ResultBatch(list);
            if (session.isOpen()) {
                try {
                    OutputStream sendStream = session.getBasicRemote().getSendStream();
                    try {
                        sendStream.write(CompressionUtils.compress(this.objectMapper.writeValueAsBytes(resultBatch), getCompressionAlgorithm(session)));
                        if (sendStream != null) {
                            sendStream.close();
                        }
                    } catch (Throwable th) {
                        if (sendStream != null) {
                            try {
                                sendStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    log.error("Failed to send websocket result to client {}, id {}", getClientName(session), getClientId(session), e);
                }
            } else {
                findAlternativeBacklog(session).ifPresentOrElse(sessionBacklog -> {
                    sessionBacklog.add(list);
                }, () -> {
                    log.info("Not sending batch of {}. Could not find any suitable sessions for client {}.", Integer.valueOf(list.size()), getClientId(session));
                });
            }
        } catch (Throwable th3) {
            log.error("Failed to send websocket result to client {}, id {}", getClientName(session), getClientId(session), th3);
            throw th3;
        }
    }

    protected Optional<SessionBacklog> findAlternativeBacklog(Session session) {
        String clientId = getClientId(session);
        return this.sessionBacklogs.values().stream().filter(sessionBacklog -> {
            return clientId.equals(getClientId(sessionBacklog.getSession())) && !session.getId().equals(sessionBacklog.getSession().getId());
        }).findFirst();
    }

    @Override // jakarta.websocket.Endpoint
    public void onClose(Session session, CloseReason closeReason) {
        this.sessionBacklogs.remove(session.getId());
        if (this.shuttingDown.get()) {
            return;
        }
        if (closeReason.getCloseCode() != CloseReason.CloseCodes.UNEXPECTED_CONDITION && closeReason.getCloseCode().getCode() > CloseReason.CloseCodes.NO_STATUS_CODE.getCode()) {
            log.warn("Websocket session to endpoint {} for client {} with id {} closed abnormally: {}", getClass().getSimpleName(), getClientName(session), getClientId(session), closeReason);
        }
        registerMetrics(new DisconnectEvent(getClientName(session), getClientId(session), session.getId(), toString(), closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase()));
    }

    @Override // jakarta.websocket.Endpoint
    public void onError(Session session, Throwable th) {
        log.error("Error in session for client {} with id {}", getClientName(session), getClientId(session), th);
        try {
            session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "The websocket closed because of an error"));
        } catch (IOException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutDown() {
        if (this.shuttingDown.compareAndSet(false, true)) {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                this.shutDown = true;
                this.sessionBacklogs.values().stream().map((v0) -> {
                    return v0.getSession();
                }).filter((v0) -> {
                    return v0.isOpen();
                }).forEach(session -> {
                    try {
                        session.close();
                    } catch (Exception e2) {
                    }
                });
            }
        }
    }

    protected CompressionAlgorithm getCompressionAlgorithm(Session session) {
        List<String> list = session.getRequestParameterMap().get("compression");
        if (list == null) {
            return null;
        }
        return CompressionAlgorithm.valueOf(list.get(0));
    }

    protected String getProjectId(Session session) {
        return (String) Optional.ofNullable(session.getRequestParameterMap().get("projectId")).map(list -> {
            return (String) list.get(0);
        }).orElse("public");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getClientId(Session session) {
        return session.getRequestParameterMap().get("clientId").get(0);
    }

    protected String getClientName(Session session) {
        return session.getRequestParameterMap().get("clientName").get(0);
    }

    protected void registerMetrics(ClientEvent clientEvent) {
        this.metricsLog.registerMetrics(clientEvent);
    }

    public WebsocketEndpoint metricsLog(MetricsLog metricsLog) {
        this.metricsLog = metricsLog;
        return this;
    }

    protected ObjectMapper getObjectMapper() {
        return this.objectMapper;
    }
}
