/*
 * Decompiled with CFR 0.152.
 */
package reactor.aeron;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.aeron.AeronEventLoop;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronOutbound;
import reactor.aeron.AeronResources;
import reactor.aeron.AeronWriteSequencer;
import reactor.aeron.MessagePublication;
import reactor.aeron.OnDisposable;
import reactor.core.publisher.Mono;
import reactor.util.Logger;
import reactor.util.Loggers;

public final class DefaultAeronOutbound
implements AeronOutbound,
OnDisposable {
    private static final Logger logger = Loggers.getLogger(DefaultAeronOutbound.class);
    private static final RuntimeException NOT_CONNECTED_EXCEPTION = new RuntimeException("publication is not connected");
    private final String category;
    private final String channel;
    private final AeronResources resources;
    private final AeronOptions options;
    private volatile AeronWriteSequencer sequencer;
    private volatile MessagePublication publication;

    public DefaultAeronOutbound(String category, String channel, AeronResources resources, AeronOptions options) {
        this.category = category;
        this.channel = channel;
        this.resources = resources;
        this.options = options;
    }

    public Mono<Void> start(long sessionId, int streamId) {
        return Mono.defer(() -> {
            AeronEventLoop eventLoop = this.resources.nextEventLoop();
            return this.resources.messagePublication(this.category, this.channel, streamId, this.options, eventLoop).doOnSuccess(result -> {
                this.publication = result;
                this.sequencer = new AeronWriteSequencer(sessionId, this.publication, eventLoop);
            }).flatMap(result -> {
                Duration retryInterval = Duration.ofMillis(100L);
                Duration connectTimeout = this.options.connectTimeout();
                long retryCount = connectTimeout.toMillis() / retryInterval.toMillis();
                return Mono.fromCallable(this.publication::isConnected).filter(isConnected -> isConnected).switchIfEmpty(Mono.error((Throwable)NOT_CONNECTED_EXCEPTION)).retryBackoff(retryCount, retryInterval, retryInterval).timeout(connectTimeout).then().onErrorResume(th -> {
                    logger.warn("Failed to connect publication {} for sending data during {}", new Object[]{this.publication, connectTimeout});
                    return Mono.error((Throwable)th);
                });
            }).log("defaultOutbound");
        });
    }

    @Override
    public AeronOutbound send(Publisher<? extends ByteBuffer> dataStream) {
        Objects.requireNonNull(this.sequencer, "sequencer must be initialized");
        return this.then((Publisher<Void>)this.sequencer.write(dataStream));
    }

    @Override
    public Mono<Void> then() {
        return Mono.empty();
    }

    public void dispose() {
        if (this.publication != null) {
            this.publication.dispose();
        }
    }

    public boolean isDisposed() {
        return this.publication != null && this.publication.isDisposed();
    }

    @Override
    public Mono<Void> onDispose() {
        return this.publication != null ? this.publication.onDispose() : Mono.empty();
    }
}

