/*
 * Decompiled with CFR 0.152.
 */
package reactor.aeron.client;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.aeron.AeronEventLoop;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronResources;
import reactor.aeron.Connection;
import reactor.aeron.client.AeronClientConnector;
import reactor.aeron.client.AeronClientSettings;
import reactor.core.publisher.Mono;

public final class AeronClient {
    private static final AtomicInteger STREAM_ID_COUNTER = new AtomicInteger();
    private final AeronClientSettings settings;

    private AeronClient(AeronClientSettings settings) {
        this.settings = settings;
    }

    public static AeronClient create(AeronResources aeronResources) {
        return AeronClient.create("client", aeronResources);
    }

    public static AeronClient create(String name, AeronResources aeronResources) {
        return new AeronClient(AeronClientSettings.builder().name(name).aeronResources(aeronResources).build());
    }

    public Mono<? extends Connection> connect() {
        return this.connect(this.settings.options());
    }

    public Mono<? extends Connection> connect(AeronOptions options) {
        return Mono.defer(() -> this.connect0(options));
    }

    private Mono<? extends Connection> connect0(AeronOptions options) {
        return Mono.defer(() -> {
            AeronClientSettings settings = this.settings.options(options);
            int clientControlStreamId = STREAM_ID_COUNTER.incrementAndGet();
            Supplier<Integer> clientSessionStreamIdCounter = STREAM_ID_COUNTER::incrementAndGet;
            AeronClientConnector clientConnector = new AeronClientConnector(settings, clientControlStreamId, clientSessionStreamIdCounter);
            String category = Optional.ofNullable(settings.name()).orElse("client");
            AeronResources resources = settings.aeronResources();
            String clientChannel = settings.options().clientChannel();
            AeronEventLoop eventLoop = resources.nextEventLoop();
            return resources.controlSubscription(category, clientChannel, clientControlStreamId, clientConnector, eventLoop, null, image -> clientConnector.dispose()).flatMap(controlSubscription -> {
                clientConnector.onSubscription((Subscription)controlSubscription);
                return clientConnector.start().doOnError(ex -> {
                    controlSubscription.dispose();
                    clientConnector.dispose();
                }).doOnSuccess(connection -> {
                    settings.handler().apply((Connection)connection).subscribe(connection.disposeSubscriber());
                    connection.onDispose().doFinally(s -> {
                        controlSubscription.dispose();
                        clientConnector.dispose();
                    }).subscribe(null, th -> {});
                });
            });
        });
    }

    public AeronClient options(Consumer<AeronOptions.Builder> options) {
        return new AeronClient(this.settings.options(options));
    }

    public AeronClient handle(Function<? super Connection, ? extends Publisher<Void>> handler) {
        return new AeronClient(this.settings.handler(handler));
    }
}

