/*
 * Decompiled with CFR 0.152.
 */
package io.cdsoft.sf.messaging.internal.client.cometd;

import io.cdsoft.sf.messaging.MessagingException;
import io.cdsoft.sf.messaging.api.config.ConnectionConfig;
import io.cdsoft.sf.messaging.api.consumer.JacksonPlatformEventConsumer;
import io.cdsoft.sf.messaging.api.consumer.JacksonPushTopicEventConsumer;
import io.cdsoft.sf.messaging.api.consumer.JsonEventConsumer;
import io.cdsoft.sf.messaging.api.consumer.MapEventConsumer;
import io.cdsoft.sf.messaging.api.subscription.Subscription;
import io.cdsoft.sf.messaging.internal.client.ManagedClient;
import io.cdsoft.sf.messaging.internal.client.auth.ManagedAuthClient;
import io.cdsoft.sf.messaging.internal.client.cometd.ReplayExtension;
import io.cdsoft.sf.messaging.internal.client.http.ManagedHttpClient;
import io.cdsoft.sf.messaging.internal.client.retry.RetryStrategy;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedCometdClient
implements ManagedClient {
    private static final Logger LOG = LoggerFactory.getLogger(ManagedCometdClient.class);
    private static final Long CONNECT_TIMEOUT = 30L;
    private static final TimeUnit TIMEOUT_TIME_UNIT = TimeUnit.SECONDS;
    private static final Integer MAX_NETWORK_DELAY = 15000;
    private static final Integer MAX_BUFFER_SIZE = 0x100000;
    private static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
    private final ConnectionConfig config;
    private final ManagedAuthClient authClient;
    private final ManagedHttpClient httpClient;
    private final ConcurrentHashMap<String, Subscription> subscriptions = new ConcurrentHashMap();
    private BayeuxClient bayeuxClient;
    private ClientSessionChannel.MessageListener handshakeListener = (channel, message) -> {
        LOG.debug("Meta-message [{}]: {}", (Object)channel.getChannelId(), (Object)message);
        if (!message.isSuccessful()) {
            this.restartClient();
        }
    };
    private ClientSessionChannel.MessageListener connectListener = (channel, message) -> {
        LOG.debug("Meta-message [{}]: {}", (Object)channel.getChannelId(), (Object)message);
        if (message.isSuccessful()) {
            this.resubscribe();
        }
    };
    private ClientSessionChannel.MessageListener disconnectListener = (channel, message) -> {
        LOG.debug("Meta-message [{}]: {}", (Object)channel.getChannelId(), (Object)message);
        this.restartClient();
    };

    public ManagedCometdClient(ConnectionConfig config, ManagedAuthClient authClient, ManagedHttpClient httpClient) {
        this.config = config;
        this.authClient = authClient;
        this.httpClient = httpClient;
    }

    @Override
    public void doStart() throws MessagingException {
        new RetryStrategy(this.config).exectue(this::connect);
    }

    @Override
    public void doStop() throws MessagingException {
        this.disconnect();
    }

    public void addSubscription(Subscription subscription) {
        if (this.bayeuxClient.isDisconnected()) {
            throw new IllegalStateException(String.format("Connector[%s] has not been started", this.authClient.getCometdEndpoint()));
        }
        LOG.info("Subscribing to channel: {}", (Object)subscription.getChannelName());
        REPLAY_EXTENSION.addOrUpdateChannelReplayId(subscription.getTopic(), subscription.getReplayFrom());
        ClientSessionChannel channel = this.bayeuxClient.getChannel(subscription.getChannelName());
        Object consumer = subscription.getConsumer();
        channel.subscribe((c, message) -> {
            LOG.trace("Subscription-message [{}]: {}", (Object)c.getChannelId(), (Object)message);
            if (consumer instanceof JsonEventConsumer) {
                ((JsonEventConsumer)consumer).accept(message.getJSON());
            } else if (consumer instanceof MapEventConsumer) {
                ((MapEventConsumer)consumer).accept(message.getDataAsMap());
            } else if (consumer instanceof JacksonPlatformEventConsumer) {
                ((JacksonPlatformEventConsumer)consumer).accept(message.getJSON());
            } else if (consumer instanceof JacksonPushTopicEventConsumer) {
                ((JacksonPushTopicEventConsumer)consumer).accept(message.getJSON());
            }
        }, (c, message) -> {
            if (message.isSuccessful()) {
                LOG.info("Successfully Subscribed to channel: {} {}", (Object)subscription.getChannelName(), (Object)c.getChannelId());
                this.subscriptions.put(subscription.getChannelName(), subscription);
            } else {
                LOG.error("Unable to subscribe to subscription {} : {}", (Object)subscription, (Object)message);
            }
        });
    }

    public void removeSubscription(String name) {
        if (this.bayeuxClient.isDisconnected()) {
            throw new IllegalStateException(String.format("Connector[%s] has not been started", this.authClient.getCometdEndpoint()));
        }
        Subscription subscription = this.subscriptions.get(name);
        if (subscription == null) {
            LOG.warn("Subscription with name {} does not exist", (Object)name);
        } else {
            this.bayeuxClient.getChannel(subscription.getChannelName()).unsubscribe();
        }
    }

    private BayeuxClient createClient() {
        HashMap<String, Object> longPollingOptions = new HashMap<String, Object>(){
            {
                this.put("maxNetworkDelay", MAX_NETWORK_DELAY);
                this.put("maxMessageSize", MAX_BUFFER_SIZE);
            }
        };
        LongPollingTransport transport = new LongPollingTransport(longPollingOptions, this.httpClient.getHttpClient()){

            protected void customize(Request request) {
                super.customize(request);
                request.getHeaders().put(HttpHeader.AUTHORIZATION, "OAuth " + ManagedCometdClient.this.authClient.getBearerToken());
            }
        };
        BayeuxClient client = new BayeuxClient(this.authClient.getCometdEndpoint().toExternalForm(), (ClientTransport)transport, new ClientTransport[0]);
        client.addExtension((ClientSession.Extension)REPLAY_EXTENSION);
        return client;
    }

    private Boolean connect() throws MessagingException {
        this.subscriptions.clear();
        this.bayeuxClient = this.createClient();
        this.bayeuxClient.getChannel("/meta/handshake").addListener((ClientSessionChannel.ClientSessionChannelListener)this.handshakeListener);
        this.bayeuxClient.getChannel("/meta/connect").addListener((ClientSessionChannel.ClientSessionChannelListener)this.connectListener);
        this.bayeuxClient.getChannel("/meta/disconnect").addListener((ClientSessionChannel.ClientSessionChannelListener)this.disconnectListener);
        this.bayeuxClient.handshake();
        if (!this.bayeuxClient.waitFor(TimeUnit.MILLISECONDS.convert(CONNECT_TIMEOUT, TIMEOUT_TIME_UNIT), BayeuxClient.State.CONNECTED, new BayeuxClient.State[0])) {
            throw new MessagingException("Timeout connecting to " + this.authClient.getCometdEndpoint());
        }
        return true;
    }

    private void disconnect() {
        this.bayeuxClient.getChannel("/meta/handshake").removeListener((ClientSessionChannel.ClientSessionChannelListener)this.handshakeListener);
        this.bayeuxClient.getChannel("/meta/connect").removeListener((ClientSessionChannel.ClientSessionChannelListener)this.connectListener);
        this.bayeuxClient.getChannel("/meta/disconnect").removeListener((ClientSessionChannel.ClientSessionChannelListener)this.disconnectListener);
        this.bayeuxClient.disconnect();
        this.bayeuxClient = null;
    }

    private synchronized void restartClient() {
        this.disconnect();
        try {
            new RetryStrategy(this.config).exectue(this::connect);
        }
        catch (Exception e) {
            LOG.error("Unable to restart client: {}", (Throwable)e);
            throw new RuntimeException("Unable to restart client.", e);
        }
    }

    private synchronized void resubscribe() {
        LOG.trace("Refreshing subscriptions to channels on reconnect");
        for (Map.Entry<String, Subscription> entry : this.subscriptions.entrySet()) {
            Subscription subscription = entry.getValue();
            ClientSessionChannel channel = this.bayeuxClient.getChannel(subscription.getChannelName());
            if (channel.getSubscribers().size() != 0) continue;
            LOG.debug("Re-subscribing to channel: [{}] because no subscribers exist.", (Object)subscription.getChannelName());
            this.addSubscription(subscription);
        }
    }
}

