/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.transport.tcp.server;

import io.netty.buffer.ByteBuf;
import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.transport.TransportServer;
import io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec;
import io.reactivesocket.transport.tcp.ReactiveSocketFrameLogger;
import io.reactivesocket.transport.tcp.ReactiveSocketLengthCodec;
import io.reactivesocket.transport.tcp.TcpDuplexConnection;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.event.Level;
import rx.Observable;
import rx.RxReactiveStreams;

public class TcpTransportServer
implements TransportServer {
    private final TcpServer<Frame, Frame> rxNettyServer;

    private TcpTransportServer(TcpServer<Frame, Frame> rxNettyServer) {
        this.rxNettyServer = rxNettyServer;
    }

    public TransportServer.StartedServer start(final TransportServer.ConnectionAcceptor acceptor) {
        this.rxNettyServer.start((ConnectionHandler)new ConnectionHandler<Frame, Frame>(){

            public Observable<Void> handle(Connection<Frame, Frame> newConnection) {
                TcpDuplexConnection duplexConnection = new TcpDuplexConnection(newConnection);
                return RxReactiveStreams.toObservable((Publisher)acceptor.apply((DuplexConnection)duplexConnection));
            }
        });
        return new Started();
    }

    public TcpTransportServer configureServer(Function<TcpServer<Frame, Frame>, TcpServer<Frame, Frame>> configurator) {
        return new TcpTransportServer(configurator.apply(this.rxNettyServer));
    }

    public TcpTransportServer logReactiveSocketFrames(String name, Level logLevel) {
        return this.configureServer(c -> c.addChannelHandlerLast("reactive-socket-frame-codec", () -> new ReactiveSocketFrameLogger(name, logLevel)));
    }

    public static TcpTransportServer create() {
        return TcpTransportServer.create((TcpServer<ByteBuf, ByteBuf>)TcpServer.newServer());
    }

    public static TcpTransportServer create(int port) {
        return TcpTransportServer.create((TcpServer<ByteBuf, ByteBuf>)TcpServer.newServer((int)port));
    }

    public static TcpTransportServer create(SocketAddress address) {
        return TcpTransportServer.create((TcpServer<ByteBuf, ByteBuf>)TcpServer.newServer((SocketAddress)address));
    }

    public static TcpTransportServer create(TcpServer<ByteBuf, ByteBuf> rxNettyServer) {
        return new TcpTransportServer(TcpTransportServer.configure(rxNettyServer));
    }

    private static TcpServer<Frame, Frame> configure(TcpServer<ByteBuf, ByteBuf> rxNettyServer) {
        return rxNettyServer.addChannelHandlerLast("line-codec", ReactiveSocketLengthCodec::new).addChannelHandlerLast("frame-codec", ReactiveSocketFrameCodec::new);
    }

    private class Started
    implements TransportServer.StartedServer {
        private Started() {
        }

        public SocketAddress getServerAddress() {
            return TcpTransportServer.this.rxNettyServer.getServerAddress();
        }

        public int getServerPort() {
            return TcpTransportServer.this.rxNettyServer.getServerPort();
        }

        public void awaitShutdown() {
            TcpTransportServer.this.rxNettyServer.awaitShutdown();
        }

        public void awaitShutdown(long duration, TimeUnit durationUnit) {
            TcpTransportServer.this.rxNettyServer.awaitShutdown(duration, durationUnit);
        }

        public void shutdown() {
            TcpTransportServer.this.rxNettyServer.shutdown();
        }
    }
}

