/*
 * Decompiled with CFR 0.152.
 */
package reactor.aeron;

import io.aeron.Aeron;
import io.aeron.ConcurrentPublication;
import io.aeron.FragmentAssembler;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.FragmentHandler;
import java.io.File;
import java.util.Optional;
import java.util.function.Consumer;
import org.agrona.CloseHelper;
import org.agrona.IoUtil;
import reactor.aeron.AeronEventLoop;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronResourcesConfig;
import reactor.aeron.AeronUtils;
import reactor.aeron.ControlFragmentHandler;
import reactor.aeron.ControlMessageSubscriber;
import reactor.aeron.DataFragmentHandler;
import reactor.aeron.DataMessageSubscriber;
import reactor.aeron.InnerPoller;
import reactor.aeron.MessagePublication;
import reactor.aeron.OnDisposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.util.Logger;
import reactor.util.Loggers;

public class AeronResources
implements OnDisposable {
    private static final Logger logger = Loggers.getLogger(AeronResources.class);
    private final AeronResourcesConfig config;
    private final MonoProcessor<Void> start = MonoProcessor.create();
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();
    private Aeron aeron;
    private MediaDriver mediaDriver;
    private AeronEventLoop eventLoop;

    private AeronResources(AeronResourcesConfig config) {
        this.config = config;
        this.start.doOnTerminate(this::doStart).subscribe(avoid -> logger.info("{} has started", new Object[]{this}), th -> {
            logger.error("Start of {} failed with error: {}", new Object[]{this, th});
            this.dispose();
        });
        this.dispose.then(this.doDispose()).doFinally(s -> this.onDispose.onComplete()).subscribe(avoid -> logger.info("{} closed", new Object[]{this}), th -> logger.warn("{} closed with error: {}", new Object[]{this, th}));
    }

    public static AeronResources start() {
        return AeronResources.start(AeronResourcesConfig.defaultConfig());
    }

    public static AeronResources start(AeronResourcesConfig config) {
        AeronResources aeronResources = new AeronResources(config);
        aeronResources.start0();
        return aeronResources;
    }

    private void start0() {
        if (!this.isDisposed()) {
            this.start.onComplete();
        }
    }

    private void doStart() {
        MediaDriver.Context mediaContext = new MediaDriver.Context().mtuLength(this.config.mtuLength()).imageLivenessTimeoutNs(this.config.imageLivenessTimeout().toNanos()).dirDeleteOnStart(this.config.isDirDeleteOnStart());
        this.mediaDriver = MediaDriver.launchEmbedded((MediaDriver.Context)mediaContext);
        Aeron.Context aeronContext = new Aeron.Context();
        String directoryName = this.mediaDriver.aeronDirectoryName();
        aeronContext.aeronDirectoryName(directoryName);
        this.aeron = Aeron.connect((Aeron.Context)aeronContext);
        this.eventLoop = new AeronEventLoop(this.config.idleStrategySupplier().get());
        Runtime.getRuntime().addShutdownHook(new Thread(() -> this.deleteAeronDirectory(aeronContext)));
        logger.info("{} has initialized embedded media mediaDriver, aeron directory: {}", new Object[]{this, directoryName});
    }

    public AeronEventLoop nextEventLoop() {
        return this.eventLoop;
    }

    public Mono<MessagePublication> messagePublication(String category, String channel, int streamId, AeronOptions options, AeronEventLoop eventLoop) {
        ConcurrentPublication publication = this.aeron.addPublication(channel, streamId);
        MessagePublication messagePublication = new MessagePublication(category, this.config.mtuLength(), (Publication)publication, options, eventLoop);
        return eventLoop.register(messagePublication).doOnError(arg_0 -> AeronResources.lambda$messagePublication$6(category, (Publication)publication, eventLoop, arg_0)).doOnSuccess(avoid -> logger.debug("[{}] Added publication: {}", new Object[]{category, messagePublication})).thenReturn((Object)messagePublication);
    }

    public Mono<InnerPoller> controlSubscription(String category, String channel, int streamId, ControlMessageSubscriber subscriber, AeronEventLoop eventLoop, Consumer<Image> availableImageHandler, Consumer<Image> unavailableImageHandler) {
        return this.messageSubscription(category + "-control", channel, streamId, new ControlFragmentHandler(subscriber), eventLoop, availableImageHandler, unavailableImageHandler);
    }

    public Mono<InnerPoller> dataSubscription(String category, String channel, int streamId, DataMessageSubscriber subscriber, AeronEventLoop eventLoop, Consumer<Image> availableImageHandler, Consumer<Image> unavailableImageHandler) {
        return this.messageSubscription(category + "-data", channel, streamId, new DataFragmentHandler(subscriber), eventLoop, availableImageHandler, unavailableImageHandler);
    }

    public void dispose() {
        if (!this.isDisposed()) {
            this.dispose.onComplete();
        }
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> {
            logger.info("{} shutdown initiated", new Object[]{this});
            this.eventLoop.dispose();
            return this.eventLoop.onDispose().doFinally(s -> {
                CloseHelper.quietClose((AutoCloseable)this.aeron);
                CloseHelper.quietClose((AutoCloseable)this.mediaDriver);
                Optional.ofNullable(this.mediaDriver).map(MediaDriver::context).ifPresent(context -> IoUtil.delete((File)context.aeronDirectory(), (boolean)true));
                logger.info("{} shutdown complete", new Object[]{this});
            });
        });
    }

    private Mono<InnerPoller> messageSubscription(String category, String channel, int streamId, FragmentHandler fragmentHandler, AeronEventLoop eventLoop, Consumer<Image> availableImageHandler, Consumer<Image> unavailableImageHandler) {
        Subscription subscription = this.aeron.addSubscription(channel, streamId, image -> {
            if (logger.isDebugEnabled()) {
                logger.debug("[{}] {} available image, imageSessionId={}, imageSource={}", new Object[]{category, AeronUtils.format(channel, streamId), image.sessionId(), image.sourceIdentity()});
            }
            if (availableImageHandler != null) {
                availableImageHandler.accept(image);
            }
        }, image -> {
            if (logger.isDebugEnabled()) {
                logger.debug("[{}] {} unavailable image, imageSessionId={}, imageSource={}", new Object[]{category, AeronUtils.format(channel, streamId), image.sessionId(), image.sourceIdentity()});
            }
            if (unavailableImageHandler != null) {
                unavailableImageHandler.accept(image);
            }
        });
        InnerPoller innerPoller = new InnerPoller(eventLoop, subscription, (FragmentHandler)new FragmentAssembler(fragmentHandler));
        return eventLoop.register(innerPoller).doOnError(ex -> {
            logger.error("[{}] Failed to register subscription {} on eventLoop {}, cause: {}", new Object[]{category, AeronUtils.format(subscription), eventLoop, ex});
            if (!subscription.isClosed()) {
                subscription.close();
            }
        }).doOnSuccess(avoid -> logger.debug("[{}] Added subscription: {}", new Object[]{category, AeronUtils.format(subscription)})).thenReturn((Object)innerPoller);
    }

    private void deleteAeronDirectory(Aeron.Context context) {
        File file = context.aeronDirectory();
        if (file.exists()) {
            IoUtil.delete((File)file, (boolean)true);
        }
    }

    public String toString() {
        return "AeronResources@" + Integer.toHexString(System.identityHashCode(this));
    }

    private static /* synthetic */ void lambda$messagePublication$6(String category, Publication publication, AeronEventLoop eventLoop, Throwable ex) {
        logger.error("[{}] Failed to register publication {} on eventLoop {}, cause: {}", new Object[]{category, AeronUtils.format(publication), eventLoop, ex});
        if (!publication.isClosed()) {
            publication.close();
        }
    }
}

