/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.util.concurrent.GenericFutureListener;
import io.scalecube.transport.Address;
import io.scalecube.transport.BootstrapFactory;
import io.scalecube.transport.ExceptionHandler;
import io.scalecube.transport.Message;
import io.scalecube.transport.MessageDeserializerHandler;
import io.scalecube.transport.MessageHandler;
import io.scalecube.transport.MessageSerializerHandler;
import io.scalecube.transport.NetworkEmulator;
import io.scalecube.transport.NetworkEmulatorHandler;
import io.scalecube.transport.Transport;
import io.scalecube.transport.TransportConfig;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;

final class TransportImpl
implements Transport {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransportImpl.class);
    private final TransportConfig config;
    private final FluxProcessor<Message, Message> incomingMessagesSubject = DirectProcessor.create().serialize();
    private final FluxSink<Message> messageSink = this.incomingMessagesSubject.sink();
    private final Map<Address, Mono<Channel>> outgoingChannels = new ConcurrentHashMap<Address, Mono<Channel>>();
    private final BootstrapFactory bootstrapFactory;
    private final IncomingChannelInitializer incomingChannelInitializer = new IncomingChannelInitializer();
    private final ExceptionHandler exceptionHandler = new ExceptionHandler();
    private final MessageToByteEncoder<Message> serializerHandler;
    private final MessageToMessageDecoder<ByteBuf> deserializerHandler;
    private final MessageHandler messageHandler;
    private NetworkEmulator networkEmulator;
    private NetworkEmulatorHandler networkEmulatorHandler;
    private Address address;
    private ServerChannel serverChannel;
    private final MonoProcessor<Void> onClose = MonoProcessor.create();

    public TransportImpl(TransportConfig config) {
        this.config = Objects.requireNonNull(config);
        this.serializerHandler = new MessageSerializerHandler();
        this.deserializerHandler = new MessageDeserializerHandler();
        this.messageHandler = new MessageHandler(this.messageSink);
        this.bootstrapFactory = new BootstrapFactory(config);
    }

    public Mono<Transport> bind0() {
        return Mono.defer(() -> this.bind0(this.config.getPort()));
    }

    private Mono<Transport> bind0(int port) {
        return Mono.defer(() -> TransportImpl.toMono(this.bootstrapFactory.serverBootstrap().childHandler((ChannelHandler)this.incomingChannelInitializer).bind(port)).doOnSuccess(channel -> {
            this.serverChannel = (ServerChannel)channel;
            this.address = TransportImpl.toAddress(this.serverChannel.localAddress());
            this.networkEmulator = new NetworkEmulator(this.address, this.config.isUseNetworkEmulator());
            this.networkEmulatorHandler = this.config.isUseNetworkEmulator() ? new NetworkEmulatorHandler(this.networkEmulator) : null;
            LOGGER.info("Bound cluster transport on: {}", (Object)this.address);
        }).doOnError(cause -> LOGGER.error("Failed to bind cluster transport on port={}, cause: {}", (Object)port, cause)).thenReturn((Object)this));
    }

    @Override
    public Address address() {
        return this.address;
    }

    @Override
    public boolean isStopped() {
        return this.onClose.isDisposed();
    }

    @Override
    public NetworkEmulator networkEmulator() {
        return this.networkEmulator;
    }

    @Override
    public final Mono<Void> stop() {
        return Mono.defer(() -> {
            if (!this.onClose.isDisposed()) {
                this.stop0().doOnTerminate(() -> this.onClose.onComplete()).subscribe();
            }
            return this.onClose;
        });
    }

    @Override
    public final Flux<Message> listen() {
        return this.incomingMessagesSubject.onBackpressureBuffer();
    }

    @Override
    public Mono<Void> send(Address address, Message message) {
        return Mono.defer(() -> {
            Objects.requireNonNull(address);
            Objects.requireNonNull(message);
            message.setSender(this.address);
            return this.getOrConnect(address).flatMap(channel -> TransportImpl.toMono(channel.writeAndFlush((Object)message)).then()).doOnError(ex -> LOGGER.debug("Failed to send {} from {} to {}, cause: {}", new Object[]{message, this.address, address, ex}));
        });
    }

    private Mono<Channel> getOrConnect(Address address) {
        return Mono.create(sink -> this.outgoingChannels.computeIfAbsent(address, this::connect0).subscribe(arg_0 -> ((MonoSink)sink).success(arg_0), arg_0 -> ((MonoSink)sink).error(arg_0)));
    }

    private Mono<Channel> connect0(Address address) {
        return this.connect1(address).doOnSuccess(channel -> LOGGER.debug("Connected from {} to {}: {}", new Object[]{this.address, address, channel})).doOnError(throwable -> this.outgoingChannels.remove(address)).cache();
    }

    private Mono<Channel> connect1(Address address) {
        return Mono.create(sink -> ((Bootstrap)this.bootstrapFactory.clientBootstrap().handler((ChannelHandler)new OutgoingChannelInitializer(address))).connect(address.host(), address.port()).addListener(future -> TransportImpl.toMono((ChannelFuture)future).subscribe(arg_0 -> ((MonoSink)sink).success(arg_0), arg_0 -> ((MonoSink)sink).error(arg_0), () -> ((MonoSink)sink).success())));
    }

    private static Address toAddress(SocketAddress address) {
        InetSocketAddress inetAddress = (InetSocketAddress)address;
        return Address.create(inetAddress.getHostString(), inetAddress.getPort());
    }

    private static Mono<Channel> toMono(ChannelFuture channelFuture) {
        return Mono.create(sink -> channelFuture.addListener((GenericFutureListener)((ChannelFutureListener)future -> TransportImpl.toMono0((MonoSink<Channel>)sink, future))));
    }

    private static void toMono0(MonoSink<Channel> sink, ChannelFuture future) {
        if (future.isSuccess()) {
            sink.success((Object)future.channel());
        } else {
            sink.error(future.cause());
        }
    }

    private Mono<Void> stop0() {
        return Mono.defer(() -> {
            try {
                this.messageSink.complete();
            }
            catch (Exception exception) {
                // empty catch block
            }
            ArrayList stopList = new ArrayList();
            Optional.ofNullable(this.serverChannel).map(ChannelOutboundInvoker::close).map(TransportImpl::toMono).map(Mono::then).ifPresent(stopList::add);
            for (Address address : this.outgoingChannels.keySet()) {
                Optional.ofNullable(this.outgoingChannels.get(address)).ifPresent(channelMono -> channelMono.map(ChannelOutboundInvoker::close).map(TransportImpl::toMono).map(Mono::then).subscribe(stopList::add));
            }
            return Mono.when(stopList).doOnTerminate(this.outgoingChannels::clear).doOnTerminate(this.bootstrapFactory::shutdown).then();
        });
    }

    @ChannelHandler.Sharable
    private final class OutgoingChannelInitializer
    extends ChannelInitializer {
        private final Address address;

        public OutgoingChannelInitializer(Address address) {
            this.address = address;
        }

        protected void initChannel(Channel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new ChannelDuplexHandler(){

                public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                    LOGGER.debug("Disconnected from: {} {}", (Object)OutgoingChannelInitializer.this.address, (Object)ctx.channel());
                    TransportImpl.this.outgoingChannels.remove(OutgoingChannelInitializer.this.address);
                    super.channelInactive(ctx);
                }
            }});
            pipeline.addLast(new ChannelHandler[]{new ProtobufVarint32LengthFieldPrepender()});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.serializerHandler});
            if (TransportImpl.this.networkEmulatorHandler != null) {
                pipeline.addLast(new ChannelHandler[]{TransportImpl.this.networkEmulatorHandler});
            }
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.exceptionHandler});
        }
    }

    @ChannelHandler.Sharable
    private final class IncomingChannelInitializer
    extends ChannelInitializer {
        private IncomingChannelInitializer() {
        }

        protected void initChannel(Channel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new ProtobufVarint32FrameDecoder()});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.deserializerHandler});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.messageHandler});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.exceptionHandler});
        }
    }
}

