/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.protocol.tcp.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LogLevel;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.netty.HandlerNames;
import io.reactivex.netty.channel.ChannelSubscriberEvent;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.channel.ConnectionImpl;
import io.reactivex.netty.channel.DetachedChannelPipeline;
import io.reactivex.netty.client.ChannelProvider;
import io.reactivex.netty.client.ChannelProviderFactory;
import io.reactivex.netty.client.ClientState;
import io.reactivex.netty.client.ConnectionProvider;
import io.reactivex.netty.client.ConnectionProviderFactory;
import io.reactivex.netty.client.ConnectionRequest;
import io.reactivex.netty.client.Host;
import io.reactivex.netty.client.HostConnector;
import io.reactivex.netty.client.events.ClientEventListener;
import io.reactivex.netty.client.internal.SingleHostConnectionProvider;
import io.reactivex.netty.events.EventListener;
import io.reactivex.netty.events.EventPublisher;
import io.reactivex.netty.events.EventSource;
import io.reactivex.netty.internal.InternalReadTimeoutHandler;
import io.reactivex.netty.protocol.tcp.client.ConnectionRequestImpl;
import io.reactivex.netty.protocol.tcp.client.InterceptingTcpClient;
import io.reactivex.netty.protocol.tcp.client.InterceptingTcpClientImpl;
import io.reactivex.netty.protocol.tcp.client.TcpClient;
import io.reactivex.netty.protocol.tcp.client.TcpClientInterceptorChain;
import io.reactivex.netty.protocol.tcp.client.events.TcpClientEventListener;
import io.reactivex.netty.protocol.tcp.client.events.TcpClientEventPublisher;
import io.reactivex.netty.protocol.tcp.client.internal.TcpChannelProviderFactory;
import io.reactivex.netty.ssl.SslCodec;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;

public final class TcpClientImpl<W, R>
extends TcpClient<W, R> {
    private final ClientState<W, R> state;
    private final TcpClientEventPublisher eventPublisher;
    private final InterceptingTcpClient<W, R> interceptingTcpClient;
    private ConnectionRequestImpl<W, R> requestSetLazily;

    private TcpClientImpl(ClientState<W, R> state, TcpClientEventPublisher eventPublisher, InterceptingTcpClient<W, R> interceptingTcpClient) {
        this.state = state;
        this.eventPublisher = eventPublisher;
        this.interceptingTcpClient = interceptingTcpClient;
    }

    @Override
    public ConnectionRequest<W, R> createConnectionRequest() {
        return this.requestSetLazily;
    }

    @Override
    public <T> TcpClient<W, R> channelOption(ChannelOption<T> option, T value) {
        return TcpClientImpl.copy(this.state.channelOption(option, value), this.eventPublisher);
    }

    @Override
    public TcpClient<W, R> readTimeOut(final int timeOut, final TimeUnit timeUnit) {
        return this.addChannelHandlerFirst(HandlerNames.ClientReadTimeoutHandler.getName(), new Func0<ChannelHandler>(){

            public ChannelHandler call() {
                return new InternalReadTimeoutHandler((long)timeOut, timeUnit);
            }
        });
    }

    @Override
    public <WW, RR> TcpClient<WW, RR> addChannelHandlerFirst(String name, Func0<ChannelHandler> handlerFactory) {
        return TcpClientImpl.copy(this.state.addChannelHandlerFirst(name, handlerFactory), this.eventPublisher);
    }

    @Override
    public <WW, RR> TcpClient<WW, RR> addChannelHandlerFirst(EventExecutorGroup group, String name, Func0<ChannelHandler> handlerFactory) {
        return TcpClientImpl.copy(this.state.addChannelHandlerFirst(group, name, handlerFactory), this.eventPublisher);
    }

    @Override
    public <WW, RR> TcpClient<WW, RR> addChannelHandlerLast(String name, Func0<ChannelHandler> handlerFactory) {
        return TcpClientImpl.copy(this.state.addChannelHandlerLast(name, handlerFactory), this.eventPublisher);
    }

    @Override
    public <WW, RR> TcpClient<WW, RR> addChannelHandlerLast(EventExecutorGroup group, String name, Func0<ChannelHandler> handlerFactory) {
        return TcpClientImpl.copy(this.state.addChannelHandlerLast(group, name, handlerFactory), this.eventPublisher);
    }

    @Override
    public <WW, RR> TcpClient<WW, RR> addChannelHandlerBefore(String baseName, String name, Func0<ChannelHandler> handlerFactory) {
        return TcpClientImpl.copy(this.state.addChannelHandlerBefore(baseName, name, handlerFactory), this.eventPublisher);
    }

    @Override
    public <WW, RR> TcpClient<WW, RR> addChannelHandlerBefore(EventExecutorGroup group, String baseName, String name, Func0<ChannelHandler> handlerFactory) {
        return TcpClientImpl.copy(this.state.addChannelHandlerBefore(group, baseName, name, handlerFactory), this.eventPublisher);
    }

    @Override
    public <WW, RR> TcpClient<WW, RR> addChannelHandlerAfter(String baseName, String name, Func0<ChannelHandler> handlerFactory) {
        return TcpClientImpl.copy(this.state.addChannelHandlerAfter(baseName, name, handlerFactory), this.eventPublisher);
    }

    @Override
    public <WW, RR> TcpClient<WW, RR> addChannelHandlerAfter(EventExecutorGroup group, String baseName, String name, Func0<ChannelHandler> handlerFactory) {
        return TcpClientImpl.copy(this.state.addChannelHandlerAfter(group, baseName, name, handlerFactory), this.eventPublisher);
    }

    @Override
    public <WW, RR> TcpClient<WW, RR> pipelineConfigurator(Action1<ChannelPipeline> pipelineConfigurator) {
        return TcpClientImpl.copy(this.state.pipelineConfigurator(pipelineConfigurator), this.eventPublisher);
    }

    @Override
    public TcpClient<W, R> enableWireLogging(LogLevel wireLoggingLevel) {
        return TcpClientImpl.copy(this.state.enableWireLogging(wireLoggingLevel), this.eventPublisher);
    }

    @Override
    public TcpClient<W, R> secure(Func1<ByteBufAllocator, SSLEngine> sslEngineFactory) {
        return TcpClientImpl.copy(this.state.secure(sslEngineFactory), this.eventPublisher);
    }

    @Override
    public TcpClient<W, R> secure(SSLEngine sslEngine) {
        return TcpClientImpl.copy(this.state.secure(sslEngine), this.eventPublisher);
    }

    @Override
    public TcpClient<W, R> secure(SslCodec sslCodec) {
        return TcpClientImpl.copy(this.state.secure(sslCodec), this.eventPublisher);
    }

    @Override
    public TcpClient<W, R> unsafeSecure() {
        return TcpClientImpl.copy(this.state.unsafeSecure(), this.eventPublisher);
    }

    @Override
    public TcpClient<W, R> channelProvider(ChannelProviderFactory providerFactory) {
        return TcpClientImpl.copy(this.state.channelProviderFactory(providerFactory), this.eventPublisher);
    }

    public Subscription subscribe(TcpClientEventListener listener) {
        return this.interceptingTcpClient.subscribe((EventListener)listener);
    }

    @Override
    public TcpClientInterceptorChain<W, R> intercept() {
        return this.interceptingTcpClient.intercept();
    }

    ClientState<W, R> getClientState() {
        return this.state;
    }

    public static <W, R> TcpClientImpl<W, R> create(SocketAddress socketAddress) {
        Host host = new Host(socketAddress);
        return TcpClientImpl.create(new ConnectionProviderFactory<W, R>(){

            public ConnectionProvider<W, R> newProvider(Observable<HostConnector<W, R>> hosts) {
                return new SingleHostConnectionProvider(hosts);
            }
        }, (Observable<Host>)Observable.just((Object)host));
    }

    public static <W, R> TcpClientImpl<W, R> create(ConnectionProviderFactory<W, R> factory, Observable<Host> hostStream) {
        ClientState state = ClientState.create(factory, hostStream);
        TcpClientEventPublisher eventPublisher = new TcpClientEventPublisher();
        return TcpClientImpl._create(state, eventPublisher);
    }

    private static <W, R> TcpClientImpl<W, R> copy(ClientState<W, R> state, TcpClientEventPublisher eventPublisher) {
        return TcpClientImpl._create(state, eventPublisher);
    }

    static <W, R> TcpClientImpl<W, R> _create(ClientState<W, R> state, TcpClientEventPublisher eventPublisher) {
        DetachedChannelPipeline channelPipeline = state.unsafeDetachedPipeline();
        state = state.channelProviderFactory((ChannelProviderFactory)new TcpChannelProviderFactory(channelPipeline, state.getChannelProviderFactory()));
        HostConnectorFactory hostConnectorFactory = new HostConnectorFactory(state, eventPublisher);
        ConnectionProvider cp = state.getFactory().newProvider(state.getHostStream().map(hostConnectorFactory));
        InterceptingTcpClientImpl interceptingTcpClient = new InterceptingTcpClientImpl(cp, eventPublisher);
        TcpClientImpl client = new TcpClientImpl(state, eventPublisher, interceptingTcpClient);
        client.requestSetLazily = new ConnectionRequestImpl(cp);
        return client;
    }

    private static class TerminalConnectionProvider<W, R>
    implements ConnectionProvider<W, R> {
        private final Host host;
        private final Bootstrap bootstrap;
        private final ChannelProvider channelProvider;

        public TerminalConnectionProvider(Host host, ChannelProvider channelProvider, ClientState<W, R> state) {
            this.host = host;
            this.channelProvider = channelProvider;
            this.bootstrap = state.newBootstrap();
        }

        public Observable<Connection<R, W>> newConnectionRequest() {
            return this.channelProvider.newChannel(Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Channel>(){

                public void call(final Subscriber<? super Channel> s) {
                    final ChannelFuture cf = TerminalConnectionProvider.this.bootstrap.connect(TerminalConnectionProvider.this.host.getHost());
                    s.add(Subscriptions.create((Action0)new Action0(){

                        public void call() {
                            if (null != cf && !cf.isDone()) {
                                cf.cancel(false);
                            }
                        }
                    }));
                    cf.addListener((GenericFutureListener)new ChannelFutureListener(){

                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                s.onError(future.cause());
                            } else {
                                s.onNext((Object)cf.channel());
                                s.onCompleted();
                            }
                        }
                    });
                }
            })).switchMap((Func1)new Func1<Channel, Observable<Channel>>(){

                public Observable<Channel> call(final Channel channel) {
                    if (channel.eventLoop().inEventLoop()) {
                        if (channel.isRegistered()) {
                            return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Channel>(){

                                public void call(Subscriber<? super Channel> subscriber) {
                                    channel.pipeline().fireUserEventTriggered((Object)new ChannelSubscriberEvent(subscriber));
                                }
                            });
                        }
                        return Observable.just((Object)channel);
                    }
                    return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Channel>(){

                        public void call(final Subscriber<? super Channel> subscriber) {
                            channel.eventLoop().execute(new Runnable(){

                                @Override
                                public void run() {
                                    if (channel.isRegistered()) {
                                        channel.pipeline().fireUserEventTriggered((Object)new ChannelSubscriberEvent(subscriber));
                                    } else {
                                        subscriber.onNext((Object)channel);
                                        subscriber.onCompleted();
                                    }
                                }
                            });
                        }
                    });
                }
            }).map(new Func1<Channel, Connection<R, W>>(){

                public Connection<R, W> call(Channel channel) {
                    return ConnectionImpl.fromChannel((Channel)channel);
                }
            });
        }
    }

    private static class HostConnectorFactory<W, R>
    implements Func1<Host, HostConnector<W, R>> {
        private final ChannelProviderFactory channelProviderFactory;
        private final TcpClientEventPublisher clientEventPublisher;
        private final ClientState<W, R> state;

        public HostConnectorFactory(ClientState<W, R> state, TcpClientEventPublisher clientEventPublisher) {
            this.state = state;
            this.channelProviderFactory = state.getChannelProviderFactory();
            this.clientEventPublisher = clientEventPublisher;
        }

        public HostConnector<W, R> call(Host host) {
            TcpClientEventPublisher hostEventPublisher;
            TcpClientEventPublisher eventSource = hostEventPublisher = new TcpClientEventPublisher();
            hostEventPublisher.subscribe(this.clientEventPublisher);
            ChannelProvider channelProvider = this.channelProviderFactory.newProvider(host, (EventSource)eventSource, (EventPublisher)hostEventPublisher, (ClientEventListener)hostEventPublisher);
            return new HostConnector(host, new TerminalConnectionProvider<W, R>(host, channelProvider, this.state), (EventSource)hostEventPublisher, (EventPublisher)hostEventPublisher, (ClientEventListener)hostEventPublisher);
        }
    }
}

