/*
 * Decompiled with CFR 0.152.
 */
package reactor.aeron.client;

import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.impl.TimeBasedGenerator;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.aeron.AeronInbound;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronOutbound;
import reactor.aeron.AeronResources;
import reactor.aeron.AeronUtils;
import reactor.aeron.Connection;
import reactor.aeron.ControlMessageSubscriber;
import reactor.aeron.DefaultAeronOutbound;
import reactor.aeron.MessagePublication;
import reactor.aeron.MessageType;
import reactor.aeron.OnDisposable;
import reactor.aeron.Protocol;
import reactor.aeron.client.AeronClientInbound;
import reactor.aeron.client.AeronClientSettings;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.util.Logger;
import reactor.util.Loggers;

public final class AeronClientConnector
implements ControlMessageSubscriber,
OnDisposable {
    private static final Logger logger = Loggers.getLogger(AeronClientConnector.class);
    private static final int CONTROL_STREAM_ID = 1;
    private static final TimeBasedGenerator uuidGenerator = Generators.timeBasedGenerator();
    private final String category;
    private final AeronOptions options;
    private final AeronResources resources;
    private final int clientControlStreamId;
    private final String clientChannel;
    private final Supplier<Integer> clientSessionStreamIdCounter;
    private final Map<UUID, ConnectAckPromise> connectAckPromises = new ConcurrentHashMap<UUID, ConnectAckPromise>();
    private final List<ClientHandler> handlers = new CopyOnWriteArrayList<ClientHandler>();
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();

    AeronClientConnector(AeronClientSettings settings, int clientControlStreamId, Supplier<Integer> clientSessionStreamIdCounter) {
        this.options = settings.options();
        this.category = Optional.ofNullable(settings.name()).orElse("client");
        this.resources = settings.aeronResources();
        this.clientChannel = this.options.clientChannel();
        this.clientControlStreamId = clientControlStreamId;
        this.clientSessionStreamIdCounter = clientSessionStreamIdCounter;
        this.dispose.then(this.doDispose()).doFinally(s -> this.onDispose.onComplete()).subscribe(null, th -> logger.warn("AeronClientConnector disposed with error: {}", th));
    }

    public Mono<Connection> start() {
        return Mono.defer(() -> new ClientHandler().start());
    }

    private void dispose(long sessionId) {
        this.handlers.stream().filter(handler -> ((ClientHandler)handler).sessionId == sessionId).findFirst().ifPresent(ClientHandler::dispose);
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> Mono.whenDelayError((Iterable)this.handlers.stream().map(sessionHandler -> {
            sessionHandler.dispose();
            return sessionHandler.onDispose();
        }).collect(Collectors.toList())));
    }

    @Override
    public void onSubscription(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onConnectAck(UUID connectRequestId, long sessionId, int serverSessionStreamId) {
        logger.debug("[{}] Received {} for connectRequestId: {}, serverSessionStreamId: {}", new Object[]{this.category, MessageType.CONNECT_ACK, connectRequestId, serverSessionStreamId});
        ConnectAckPromise connectAckPromise = this.connectAckPromises.remove(connectRequestId);
        if (connectAckPromise != null) {
            connectAckPromise.success(sessionId, serverSessionStreamId);
        }
    }

    @Override
    public void onComplete(long sessionId) {
        logger.info("[{}] Received {} for sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, sessionId});
        this.dispose(sessionId);
    }

    @Override
    public void onConnect(UUID connectRequestId, String clientChannel, int clientControlStreamId, int clientSessionStreamId) {
        logger.error("[{}] Unsupported {} request for a client, clientChannel: {}, clientControlStreamId: {}, clientSessionStreamId: {}", new Object[]{this.category, MessageType.CONNECT, clientChannel, clientControlStreamId, clientSessionStreamId});
    }

    static /* synthetic */ TimeBasedGenerator access$000() {
        return uuidGenerator;
    }

    private static class ConnectAckResponse {
        private final long sessionId;
        private final int serverSessionStreamId;

        private ConnectAckResponse(long sessionId, int serverSessionStreamId) {
            this.sessionId = sessionId;
            this.serverSessionStreamId = serverSessionStreamId;
        }
    }

    private class ConnectAckPromise
    implements Disposable {
        private final UUID connectRequestId;
        private final MonoProcessor<ConnectAckResponse> promise;

        private ConnectAckPromise(UUID connectRequestId) {
            this.connectRequestId = connectRequestId;
            this.promise = MonoProcessor.create();
        }

        private Mono<ConnectAckResponse> promise() {
            return this.promise;
        }

        private void success(long sessionId, int serverSessionStreamId) {
            this.promise.onNext((Object)new ConnectAckResponse(sessionId, serverSessionStreamId));
            this.promise.onComplete();
        }

        public void dispose() {
            AeronClientConnector.this.connectAckPromises.remove(this.connectRequestId);
            this.promise.cancel();
        }

        public boolean isDisposed() {
            return this.promise.isDisposed();
        }
    }

    private class ClientHandler
    implements Connection {
        private final DefaultAeronOutbound outbound;
        private final int clientSessionStreamId;
        private final UUID connectRequestId = AeronClientConnector.access$000().generate();
        private final String serverChannel;
        private final Mono<MessagePublication> controlPublication;
        private volatile long sessionId;
        private volatile int serverSessionStreamId;
        private volatile AeronClientInbound inbound;
        private final MonoProcessor<Void> dispose = MonoProcessor.create();
        private final MonoProcessor<Void> onDispose = MonoProcessor.create();

        private ClientHandler() {
            this.clientSessionStreamId = (Integer)AeronClientConnector.this.clientSessionStreamIdCounter.get();
            this.serverChannel = AeronClientConnector.this.options.serverChannel();
            this.inbound = new AeronClientInbound(AeronClientConnector.this.category, AeronClientConnector.this.resources);
            this.outbound = new DefaultAeronOutbound(AeronClientConnector.this.category, this.serverChannel, AeronClientConnector.this.resources, AeronClientConnector.this.options);
            this.controlPublication = Mono.defer(this::newControlPublication).cache();
            this.dispose.then(this.doDispose()).doFinally(s -> this.onDispose.onComplete()).subscribe(null, th -> logger.warn("ClientHandler disposed with error: {}", th));
        }

        private Mono<MessagePublication> newControlPublication() {
            return AeronClientConnector.this.resources.messagePublication(AeronClientConnector.this.category, this.serverChannel, 1, AeronClientConnector.this.options, AeronClientConnector.this.resources.nextEventLoop());
        }

        private Mono<? extends Connection> start() {
            AeronClientConnector.this.handlers.add(this);
            return this.connect().flatMap(response -> {
                this.sessionId = ((ConnectAckResponse)response).sessionId;
                this.serverSessionStreamId = ((ConnectAckResponse)response).serverSessionStreamId;
                return this.inbound.start(AeronClientConnector.this.clientChannel, this.clientSessionStreamId, this.sessionId, this::dispose).then(this.outbound.start(this.sessionId, this.serverSessionStreamId)).thenReturn((Object)this);
            }).doOnError(ex -> {
                logger.error("[{}] Occurred exception for sessionId: {} clientSessionStreamId: {}, error: ", new Object[]{AeronClientConnector.this.category, this.sessionId, this.clientSessionStreamId, ex});
                this.dispose();
            }).thenReturn((Object)this);
        }

        private Mono<ConnectAckResponse> connect() {
            ConnectAckPromise connectAckPromise = AeronClientConnector.this.connectAckPromises.computeIfAbsent(this.connectRequestId, x$0 -> new ConnectAckPromise((UUID)x$0));
            return this.sendConnectRequest().then(connectAckPromise.promise().timeout(AeronClientConnector.this.options.ackTimeout()).doOnError(ex -> logger.warn("Failed to receive {} during {} millis", new Object[]{MessageType.CONNECT_ACK, AeronClientConnector.this.options.ackTimeout().toMillis()}))).doOnSuccess(response -> logger.debug("[{}] Successfully connected to server at {}, sessionId: {}", new Object[]{AeronClientConnector.this.category, AeronUtils.minifyChannel(this.serverChannel), ((ConnectAckResponse)response).sessionId})).doOnTerminate(connectAckPromise::dispose).doOnError(ex -> logger.warn("Failed to connect to server at {}, cause: {}", new Object[]{AeronUtils.minifyChannel(this.serverChannel), ex}));
        }

        private Mono<Void> sendConnectRequest() {
            return this.controlPublication.flatMap(publication -> {
                logger.debug("[{}] Connecting to server at {}", new Object[]{AeronClientConnector.this.category, AeronUtils.minifyChannel(this.serverChannel)});
                ByteBuffer buffer = Protocol.createConnectBody(this.connectRequestId, AeronClientConnector.this.clientChannel, AeronClientConnector.this.clientControlStreamId, this.clientSessionStreamId);
                return this.send((MessagePublication)publication, buffer, MessageType.CONNECT);
            });
        }

        private Mono<Void> sendDisconnectRequest() {
            return this.controlPublication.flatMap(publication -> {
                logger.debug("[{}] Disconnecting from server at {}", new Object[]{AeronClientConnector.this.category, AeronUtils.minifyChannel(this.serverChannel)});
                ByteBuffer buffer = Protocol.createDisconnectBody(this.sessionId);
                return this.send((MessagePublication)publication, buffer, MessageType.COMPLETE);
            });
        }

        private Mono<Void> send(MessagePublication mp, ByteBuffer buffer, MessageType messageType) {
            return mp.enqueue(messageType, buffer, this.sessionId).doOnSuccess(avoid -> logger.debug("[{}] Sent {} to {}", new Object[]{AeronClientConnector.this.category, messageType, AeronUtils.minifyChannel(this.serverChannel)})).doOnError(ex -> logger.warn("[{}] Failed to send {} to {}, cause: {}", new Object[]{AeronClientConnector.this.category, messageType, AeronUtils.minifyChannel(this.serverChannel), ex}));
        }

        @Override
        public AeronInbound inbound() {
            return this.inbound;
        }

        @Override
        public AeronOutbound outbound() {
            return this.outbound;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("ClientSession{");
            sb.append("sessionId=").append(this.sessionId);
            sb.append(", clientChannel=").append(AeronClientConnector.this.clientChannel);
            sb.append(", serverChannel=").append(this.serverChannel);
            sb.append(", clientSessionStreamId=").append(this.clientSessionStreamId);
            sb.append(", serverSessionStreamId=").append(this.serverSessionStreamId);
            sb.append('}');
            return sb.toString();
        }

        public void dispose() {
            this.dispose.onComplete();
        }

        public boolean isDisposed() {
            return this.onDispose.isDisposed();
        }

        @Override
        public Mono<Void> onDispose() {
            return this.onDispose;
        }

        private Mono<Void> doDispose() {
            return Mono.defer(() -> {
                logger.debug("[{}] About to close session with sessionId: {}", new Object[]{AeronClientConnector.this.category, this.sessionId});
                AeronClientConnector.this.handlers.remove(this);
                return this.sendDisconnectRequest().onErrorResume(ex -> Mono.empty()).then(Mono.defer(() -> {
                    Optional.ofNullable(this.outbound).ifPresent(DefaultAeronOutbound::dispose);
                    Optional.ofNullable(this.inbound).ifPresent(AeronClientInbound::dispose);
                    return Mono.whenDelayError((Publisher[])new Publisher[]{(Publisher)Optional.ofNullable(this.outbound).map(DefaultAeronOutbound::onDispose).orElse(Mono.empty()), (Publisher)Optional.ofNullable(this.inbound).map(AeronClientInbound::onDispose).orElse(Mono.empty())}).doFinally(s -> logger.debug("[{}] Closed session with sessionId: {}", new Object[]{AeronClientConnector.this.category, this.sessionId}));
                }));
            });
        }
    }
}

