/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.transport;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.util.concurrent.GenericFutureListener;
import io.scalecube.transport.Address;
import io.scalecube.transport.Addressing;
import io.scalecube.transport.BootstrapFactory;
import io.scalecube.transport.ExceptionHandler;
import io.scalecube.transport.ITransport;
import io.scalecube.transport.Message;
import io.scalecube.transport.MessageDeserializerHandler;
import io.scalecube.transport.MessageReceiverHandler;
import io.scalecube.transport.MessageSerializerHandler;
import io.scalecube.transport.NetworkEmulatorHandler;
import io.scalecube.transport.TransportConfig;
import io.scalecube.transport.memoizer.Memoizer;
import java.net.InetAddress;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public final class Transport
implements ITransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class);
    private static final CompletableFuture<Void> COMPLETED_PROMISE = CompletableFuture.completedFuture(null);
    private final TransportConfig config;
    private final Subject<Message, Message> incomingMessagesSubject = PublishSubject.create().toSerialized();
    private final Memoizer<Address, ChannelFuture> outgoingChannels;
    private final BootstrapFactory bootstrapFactory;
    private final IncomingChannelInitializer incomingChannelInitializer = new IncomingChannelInitializer();
    private final ExceptionHandler exceptionHandler = new ExceptionHandler();
    private final MessageToByteEncoder<Message> serializerHandler;
    private final MessageToMessageDecoder<ByteBuf> deserializerHandler;
    private final MessageReceiverHandler messageHandler;
    private final NetworkEmulatorHandler networkEmulatorHandler;
    private Address address;
    private ServerChannel serverChannel;
    private volatile boolean stopped = false;

    private Transport(TransportConfig config) {
        Preconditions.checkArgument((config != null ? 1 : 0) != 0);
        this.config = config;
        this.serializerHandler = new MessageSerializerHandler();
        this.deserializerHandler = new MessageDeserializerHandler();
        this.networkEmulatorHandler = config.isUseNetworkEmulator() ? new NetworkEmulatorHandler() : null;
        this.messageHandler = new MessageReceiverHandler(this.incomingMessagesSubject);
        this.bootstrapFactory = new BootstrapFactory(config);
        this.outgoingChannels = new Memoizer<Address, ChannelFuture>(new OutgoingChannelComputable());
    }

    public static Transport bindAwait() {
        return Transport.bindAwait(TransportConfig.defaultConfig());
    }

    public static Transport bindAwait(boolean useNetworkEmulator) {
        return Transport.bindAwait(TransportConfig.builder().useNetworkEmulator(useNetworkEmulator).build());
    }

    public static Transport bindAwait(TransportConfig config) {
        try {
            return Transport.bind(config).get();
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)Throwables.getRootCause((Throwable)e));
        }
    }

    public static CompletableFuture<Transport> bind() {
        return Transport.bind(TransportConfig.defaultConfig());
    }

    public static CompletableFuture<Transport> bind(TransportConfig config) {
        return new Transport(config).bind0();
    }

    private CompletableFuture<Transport> bind0() {
        this.incomingMessagesSubject.subscribeOn(Schedulers.from((Executor)this.bootstrapFactory.getWorkerGroup()));
        InetAddress listenAddress = Addressing.getLocalIpAddress(this.config.getListenAddress(), this.config.getListenInterface(), this.config.isPreferIPv6());
        int bindPort = this.config.isPortAutoIncrement() ? Addressing.getNextAvailablePort(listenAddress, this.config.getPort(), this.config.getPortCount()) : this.config.getPort();
        this.address = Address.create(listenAddress.getHostAddress(), bindPort);
        ServerBootstrap server = this.bootstrapFactory.serverBootstrap().childHandler((ChannelHandler)this.incomingChannelInitializer);
        ChannelFuture bindFuture = server.bind(listenAddress, this.address.port());
        final CompletableFuture<Transport> result = new CompletableFuture<Transport>();
        bindFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    Transport.this.serverChannel = (ServerChannel)channelFuture.channel();
                    LOGGER.info("Bound to: {}", (Object)Transport.this.address);
                    result.complete(Transport.this);
                } else {
                    LOGGER.error("Failed to bind to: {}, cause: {}", (Object)Transport.this.address, (Object)channelFuture.cause());
                    result.completeExceptionally(channelFuture.cause());
                }
            }
        });
        return result;
    }

    @Override
    public Address address() {
        return this.address;
    }

    public boolean isStopped() {
        return this.stopped;
    }

    public void setNetworkSettings(Address destination, int lostPercent, int meanDelay) {
        if (this.config.isUseNetworkEmulator()) {
            this.networkEmulatorHandler.setNetworkSettings(destination, lostPercent, meanDelay);
            LOGGER.info("Set network settings (loss={}%, mean={}ms) from {} to {}", new Object[]{lostPercent, meanDelay, this.address, destination});
        } else {
            LOGGER.warn("Noop on 'setNetworkSettings({},{},{})' since network emulator is disabled", new Object[]{destination, lostPercent, meanDelay});
        }
    }

    public void setDefaultNetworkSettings(int lostPercent, int meanDelay) {
        if (this.config.isUseNetworkEmulator()) {
            this.networkEmulatorHandler.setDefaultNetworkSettings(lostPercent, meanDelay);
            LOGGER.info("Set default network settings (loss={}%, mean={}ms)");
        } else {
            LOGGER.warn("Noop on 'setDefaultNetworkSettings({},{})' since network emulator is disabled", (Object)lostPercent, (Object)meanDelay);
        }
    }

    public void block(Address destination) {
        if (this.config.isUseNetworkEmulator()) {
            this.networkEmulatorHandler.block(destination);
            LOGGER.info("Block network from {} to {}", (Object)this.address, (Object)destination);
        } else {
            LOGGER.warn("Noop on 'block({})' since network emulator is disabled", (Object)destination);
        }
    }

    public void block(Collection<Address> destinations) {
        if (this.config.isUseNetworkEmulator()) {
            this.networkEmulatorHandler.block(destinations);
            LOGGER.info("Block network from {} to {}", (Object)this.address, destinations);
        } else {
            LOGGER.warn("Noop on 'block({})' since network emulator is disabled", destinations);
        }
    }

    public void unblock(Address destination) {
        if (this.config.isUseNetworkEmulator()) {
            this.networkEmulatorHandler.unblock(destination);
            LOGGER.info("Unblock network from {} to {}", (Object)this.address, (Object)destination);
        } else {
            LOGGER.warn("Noop on 'unblock({})' since network emulator is disabled", (Object)destination);
        }
    }

    public void unblockAll() {
        if (this.config.isUseNetworkEmulator()) {
            this.networkEmulatorHandler.unblockAll();
            LOGGER.info("Unblock all network from {}", (Object)this.address);
        } else {
            LOGGER.warn("Noop on 'unblockAll()' since network emulator is disabled");
        }
    }

    @Override
    public final void stop() {
        this.stop(COMPLETED_PROMISE);
    }

    @Override
    public final void stop(CompletableFuture<Void> promise) {
        Preconditions.checkState((!this.stopped ? 1 : 0) != 0, (Object)"Transport is stopped");
        Preconditions.checkArgument((promise != null ? 1 : 0) != 0);
        this.stopped = true;
        try {
            this.incomingMessagesSubject.onCompleted();
        }
        catch (Exception exception) {
            // empty catch block
        }
        for (Address address : this.outgoingChannels.keySet()) {
            ChannelFuture channelFuture = this.outgoingChannels.getIfExists(address);
            if (channelFuture == null) continue;
            if (channelFuture.isSuccess()) {
                channelFuture.channel().close();
                continue;
            }
            channelFuture.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
        this.outgoingChannels.clear();
        if (this.serverChannel != null) {
            this.composeFutures(this.serverChannel.close(), promise);
        }
        this.bootstrapFactory.shutdown();
    }

    @Override
    @Nonnull
    public final Observable<Message> listen() {
        Preconditions.checkState((!this.stopped ? 1 : 0) != 0, (Object)"Transport is stopped");
        return this.incomingMessagesSubject.asObservable();
    }

    @Override
    public void send(@CheckForNull Address address, @CheckForNull Message message) {
        this.send(address, message, COMPLETED_PROMISE);
    }

    @Override
    public void send(@CheckForNull Address address, @CheckForNull Message message, @CheckForNull CompletableFuture<Void> promise) {
        Preconditions.checkState((!this.stopped ? 1 : 0) != 0, (Object)"Transport is stopped");
        Preconditions.checkArgument((address != null ? 1 : 0) != 0);
        Preconditions.checkArgument((message != null ? 1 : 0) != 0);
        Preconditions.checkArgument((promise != null ? 1 : 0) != 0);
        message.setSender(this.address);
        ChannelFuture channelFuture = this.outgoingChannels.get(address);
        if (channelFuture.isSuccess()) {
            this.composeFutures(channelFuture.channel().writeAndFlush((Object)message), promise);
        } else {
            channelFuture.addListener(chFuture -> {
                if (chFuture.isSuccess()) {
                    this.composeFutures(chFuture.channel().writeAndFlush((Object)message), promise);
                } else {
                    promise.completeExceptionally(chFuture.cause());
                }
            });
        }
    }

    private void composeFutures(ChannelFuture channelFuture, @Nonnull CompletableFuture<Void> promise) {
        channelFuture.addListener(future -> {
            if (channelFuture.isSuccess()) {
                promise.complete((Void)channelFuture.get());
            } else {
                promise.completeExceptionally(channelFuture.cause());
            }
        });
    }

    @ChannelHandler.Sharable
    private final class OutgoingChannelInitializer
    extends ChannelInitializer {
        private final Address address;

        public OutgoingChannelInitializer(Address address) {
            this.address = address;
        }

        protected void initChannel(Channel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new ChannelDuplexHandler(){

                public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                    LOGGER.debug("Disconnected from: {} {}", (Object)OutgoingChannelInitializer.this.address, (Object)ctx.channel());
                    Transport.this.outgoingChannels.delete(OutgoingChannelInitializer.this.address);
                    super.channelInactive(ctx);
                }
            }});
            pipeline.addLast(new ChannelHandler[]{new ProtobufVarint32LengthFieldPrepender()});
            pipeline.addLast(new ChannelHandler[]{Transport.this.serializerHandler});
            if (Transport.this.networkEmulatorHandler != null) {
                pipeline.addLast(new ChannelHandler[]{Transport.this.networkEmulatorHandler});
            }
            pipeline.addLast(new ChannelHandler[]{Transport.this.exceptionHandler});
        }
    }

    @ChannelHandler.Sharable
    private final class IncomingChannelInitializer
    extends ChannelInitializer {
        private IncomingChannelInitializer() {
        }

        protected void initChannel(Channel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new ProtobufVarint32FrameDecoder()});
            pipeline.addLast(new ChannelHandler[]{Transport.this.deserializerHandler});
            pipeline.addLast(new ChannelHandler[]{Transport.this.messageHandler});
            pipeline.addLast(new ChannelHandler[]{Transport.this.exceptionHandler});
        }
    }

    private final class OutgoingChannelComputable
    implements Function<Address, ChannelFuture> {
        private OutgoingChannelComputable() {
        }

        @Override
        public ChannelFuture apply(final Address address) {
            OutgoingChannelInitializer channelInitializer = new OutgoingChannelInitializer(address);
            Bootstrap client = (Bootstrap)Transport.this.bootstrapFactory.clientBootstrap().handler((ChannelHandler)channelInitializer);
            ChannelFuture connectFuture = client.connect(address.host(), address.port());
            connectFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture channelFuture) {
                    if (channelFuture.isSuccess()) {
                        LOGGER.info("Connected from {} to {}: {}", new Object[]{Transport.this.address, address, channelFuture.channel()});
                    } else {
                        LOGGER.warn("Failed to connect from {} to {}", (Object)Transport.this.address, (Object)address);
                        Transport.this.outgoingChannels.delete(address);
                    }
                }
            });
            return connectFuture;
        }
    }
}

