/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.scalecube.transport.Address;
import io.scalecube.transport.ExceptionHandler;
import io.scalecube.transport.Message;
import io.scalecube.transport.MessageCodec;
import io.scalecube.transport.NetworkEmulator;
import io.scalecube.transport.Transport;
import io.scalecube.transport.TransportConfig;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.DisposableChannel;
import reactor.netty.DisposableServer;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;

final class TransportImpl
implements Transport {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransportImpl.class);
    private final TransportConfig config;
    private final LoopResources loopResources;
    private final DirectProcessor<Message> messagesSubject;
    private final FluxSink<Message> messageSink;
    private final Map<Address, Mono<? extends Connection>> connections;
    private final ExceptionHandler exceptionHandler;
    private final TransportChannelInitializer channelInitializer;
    private final MonoProcessor<Void> stop;
    private final MonoProcessor<Void> onStop;
    private final NetworkEmulator networkEmulator;
    private final Address address;
    private final DisposableServer server;
    private final MessageCodec messageCodec;

    public TransportImpl(TransportConfig config) {
        this.config = Objects.requireNonNull(config);
        this.loopResources = LoopResources.create((String)"sc-cluster-io", (int)1, (boolean)true);
        this.messagesSubject = DirectProcessor.create();
        this.messageSink = this.messagesSubject.sink();
        this.connections = new ConcurrentHashMap<Address, Mono<? extends Connection>>();
        this.exceptionHandler = new ExceptionHandler();
        this.channelInitializer = new TransportChannelInitializer();
        this.stop = MonoProcessor.create();
        this.onStop = MonoProcessor.create();
        this.messageCodec = config.getMessageCodec();
        this.networkEmulator = null;
        this.address = null;
        this.server = null;
    }

    private TransportImpl(Address address, DisposableServer server, NetworkEmulator networkEmulator, TransportImpl other) {
        this.address = Objects.requireNonNull(address);
        this.server = Objects.requireNonNull(server);
        this.networkEmulator = Objects.requireNonNull(networkEmulator);
        this.config = other.config;
        this.loopResources = other.loopResources;
        this.messagesSubject = other.messagesSubject;
        this.messageSink = other.messageSink;
        this.connections = other.connections;
        this.exceptionHandler = other.exceptionHandler;
        this.channelInitializer = other.channelInitializer;
        this.stop = other.stop;
        this.onStop = other.onStop;
        this.messageCodec = other.messageCodec;
        this.stop.then(this.doStop()).doFinally(s -> this.onStop.onComplete()).subscribe(null, ex -> LOGGER.warn("Exception occurred on transport stop: " + ex));
    }

    public Mono<Transport> bind0() {
        return this.newTcpServer().handle(this::onMessage).bind().doOnSuccess(server -> LOGGER.debug("Bound cluster transport on {}:{}", (Object)server.host(), (Object)server.port())).doOnError(ex -> LOGGER.error("Failed to bind cluster transport on port={}, cause: {}", (Object)this.config.getPort(), (Object)ex.toString())).map(this::onBind);
    }

    @Override
    public Address address() {
        return this.address;
    }

    @Override
    public boolean isStopped() {
        return this.onStop.isDisposed();
    }

    @Override
    public NetworkEmulator networkEmulator() {
        return this.networkEmulator;
    }

    @Override
    public final Mono<Void> stop() {
        return Mono.defer(() -> {
            this.stop.onComplete();
            return this.onStop;
        });
    }

    private Mono<Void> doStop() {
        return Mono.defer(() -> {
            LOGGER.debug("Transport is shutting down on {}", (Object)this.address);
            this.messageSink.complete();
            return Flux.concatDelayError((Publisher[])new Publisher[]{this.closeServer(), this.closeConnections()}).doOnTerminate(() -> ((LoopResources)this.loopResources).dispose()).then().doOnSuccess(avoid -> LOGGER.debug("Transport has shut down on {}", (Object)this.address));
        });
    }

    @Override
    public final Flux<Message> listen() {
        return this.messagesSubject.onBackpressureBuffer();
    }

    @Override
    public Mono<Void> send(Address address, Message message) {
        return this.getOrConnect(address).flatMap(conn -> this.send0((Connection)conn, message, address)).then().doOnError(ex -> LOGGER.debug("Failed to send {} to {}, cause: {}", new Object[]{message, address, ex.toString()}));
    }

    @Override
    public Mono<Message> requestResponse(Message request, Address address) {
        return Mono.create(sink -> {
            Objects.requireNonNull(request, "request must be not null");
            Objects.requireNonNull(request.correlationId(), "correlationId must be not null");
            Disposable disposable = this.listen().filter(resp -> resp.correlationId() != null).filter(resp -> resp.correlationId().equals(request.correlationId())).take(1L).subscribe(arg_0 -> ((MonoSink)sink).success(arg_0), arg_0 -> ((MonoSink)sink).error(arg_0), () -> ((MonoSink)sink).success());
            this.send(address, request).subscribe(null, ex -> {
                LOGGER.warn("Unexpected exception on transport request-response, cause: {}", (Object)ex.toString());
                sink.error(ex);
                if (!disposable.isDisposed()) {
                    disposable.dispose();
                }
            });
        });
    }

    private Mono<Void> onMessage(NettyInbound in, NettyOutbound out) {
        return in.receive().retain().map(this::toMessage).doOnNext(arg_0 -> this.messageSink.next(arg_0)).then();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Message toMessage(ByteBuf byteBuf) {
        try (ByteBufInputStream stream = new ByteBufInputStream(byteBuf, true);){
            Message message = this.messageCodec.deserialize((InputStream)stream);
            return message;
        }
        catch (Exception e) {
            throw new DecoderException((Throwable)e);
        }
    }

    private TransportImpl onBind(DisposableServer server) {
        Address address = Address.create(server.address().getHostString(), server.address().getPort());
        NetworkEmulator networkEmulator = new NetworkEmulator(address, this.config.isUseNetworkEmulator());
        return new TransportImpl(address, server, networkEmulator, this);
    }

    private Mono<? extends Void> send0(Connection conn, Message message, Address address) {
        Objects.requireNonNull(message.sender(), "sender must be not null");
        return conn.outbound().options(NettyPipeline.SendOptions::flushOnEach).send((Publisher)Mono.just((Object)message).flatMap(msg -> this.networkEmulator.tryFail((Message)msg, address)).flatMap(msg -> this.networkEmulator.tryDelay((Message)msg, address)).map(this::toByteBuf)).then();
    }

    private ByteBuf toByteBuf(Message message) {
        ByteBuf bb = ByteBufAllocator.DEFAULT.buffer();
        ByteBufOutputStream stream = new ByteBufOutputStream(bb);
        try {
            this.messageCodec.serialize(message, (OutputStream)stream);
        }
        catch (Exception e) {
            bb.release();
            throw new EncoderException((Throwable)e);
        }
        return bb;
    }

    private Mono<Connection> getOrConnect(Address address) {
        return Mono.create(sink -> this.connections.computeIfAbsent(address, this::connect0).subscribe(arg_0 -> ((MonoSink)sink).success(arg_0), arg_0 -> ((MonoSink)sink).error(arg_0)));
    }

    private Mono<? extends Connection> connect0(Address address) {
        return this.newTcpClient(address).doOnDisconnected(c -> {
            LOGGER.debug("Disconnected from: {} {}", (Object)address, (Object)c.channel());
            this.connections.remove(address);
        }).doOnConnected(c -> LOGGER.debug("Connected to {}: {}", (Object)address, (Object)c.channel())).connect().doOnError(th -> {
            LOGGER.debug("Failed to connect to remote address {}, cause: {}", (Object)address, (Object)th.toString());
            this.connections.remove(address);
        }).cache();
    }

    private Mono<Void> closeServer() {
        return Mono.defer(() -> Optional.ofNullable(this.server).map(server -> {
            server.dispose();
            return server.onDispose().doOnError(e -> LOGGER.warn("Failed to close server: " + e));
        }).orElse(Mono.empty()));
    }

    private Mono<Void> closeConnections() {
        return Mono.fromRunnable(() -> this.connections.values().forEach(connectionMono -> connectionMono.doOnNext(DisposableChannel::dispose).flatMap(DisposableChannel::onDispose).subscribe(null, e -> LOGGER.warn("Failed to close connection: " + e))));
    }

    private TcpServer newTcpServer() {
        return TcpServer.create().runOn(this.loopResources).option(ChannelOption.TCP_NODELAY, (Object)true).option(ChannelOption.SO_KEEPALIVE, (Object)true).option(ChannelOption.SO_REUSEADDR, (Object)true).addressSupplier(() -> new InetSocketAddress(this.config.getPort())).bootstrap(b -> BootstrapHandlers.updateConfiguration((ServerBootstrap)b, (String)"inbound", (BiConsumer)this.channelInitializer));
    }

    private TcpClient newTcpClient(Address address) {
        return TcpClient.create((ConnectionProvider)ConnectionProvider.newConnection()).runOn(this.loopResources).host(address.host()).port(address.port()).option(ChannelOption.TCP_NODELAY, (Object)true).option(ChannelOption.SO_KEEPALIVE, (Object)true).option(ChannelOption.SO_REUSEADDR, (Object)true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.config.getConnectTimeout()).bootstrap(b -> BootstrapHandlers.updateConfiguration((Bootstrap)b, (String)"outbound", (BiConsumer)this.channelInitializer));
    }

    private final class TransportChannelInitializer
    implements BiConsumer<ConnectionObserver, Channel> {
        private static final int MAX_FRAME_LENGTH = 8192;
        private static final int LENGTH_FIELD_LENGTH = 2;

        private TransportChannelInitializer() {
        }

        @Override
        public void accept(ConnectionObserver connectionObserver, Channel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new LengthFieldPrepender(2)});
            pipeline.addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(8192, 0, 2, 0, 2)});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.exceptionHandler});
        }
    }
}

