/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.client.pool;

import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.client.ClientConnectionToChannelBridge;
import io.reactivex.netty.client.HostConnector;
import io.reactivex.netty.client.events.ClientEventListener;
import io.reactivex.netty.client.pool.IdleConnectionsHolder;
import io.reactivex.netty.client.pool.PoolConfig;
import io.reactivex.netty.client.pool.PoolExhaustedException;
import io.reactivex.netty.client.pool.PoolLimitDeterminationStrategy;
import io.reactivex.netty.client.pool.PooledConnection;
import io.reactivex.netty.client.pool.PooledConnectionProvider;
import io.reactivex.netty.events.Clock;
import io.reactivex.netty.events.EventAttributeKeys;
import io.reactivex.netty.events.EventPublisher;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Actions;
import rx.functions.Func1;

public final class PooledConnectionProviderImpl<W, R>
extends PooledConnectionProvider<W, R> {
    private static final Logger logger = LoggerFactory.getLogger(PooledConnectionProviderImpl.class);
    private final Subscription idleConnCleanupSubscription;
    private final IdleConnectionsHolder<W, R> idleConnectionsHolder;
    private final PoolLimitDeterminationStrategy limitDeterminationStrategy;
    private final long maxIdleTimeMillis;
    private final HostConnector<W, R> hostConnector;
    private volatile boolean isShutdown;

    public PooledConnectionProviderImpl(PoolConfig<W, R> poolConfig, HostConnector<W, R> hostConnector) {
        this.hostConnector = hostConnector;
        this.idleConnectionsHolder = poolConfig.getIdleConnectionsHolder();
        this.limitDeterminationStrategy = poolConfig.getPoolLimitDeterminationStrategy();
        this.maxIdleTimeMillis = poolConfig.getMaxIdleTimeMillis();
        this.idleConnCleanupSubscription = poolConfig.getIdleConnectionsCleanupTimer().doOnError(LogErrorAction.INSTANCE).retry().concatMap(new IdleConnectionCleanupTask()).onErrorResumeNext(new Func1<Throwable, Observable<Void>>(){

            @Override
            public Observable<Void> call(Throwable throwable) {
                logger.error("Ignoring error cleaning up idle connections.", throwable);
                return Observable.empty();
            }
        }).subscribe(Actions.empty());
        hostConnector.getHost().getCloseNotifier().doOnTerminate(new Action0(){

            @Override
            public void call() {
                PooledConnectionProviderImpl.this.isShutdown = true;
                PooledConnectionProviderImpl.this.idleConnCleanupSubscription.unsubscribe();
            }
        }).onErrorResumeNext((Func1<Throwable, Observable<Void>>)new Func1<Throwable, Observable<? extends Void>>(){

            @Override
            public Observable<? extends Void> call(Throwable throwable) {
                logger.error("Error listening to Host close notifications. Shutting down the pool.", throwable);
                return Observable.empty();
            }
        }).subscribe(Actions.empty());
    }

    @Override
    public Observable<Connection<R, W>> newConnectionRequest() {
        return Observable.create(new Observable.OnSubscribe<Connection<R, W>>(){

            @Override
            public void call(Subscriber<? super Connection<R, W>> subscriber) {
                if (PooledConnectionProviderImpl.this.isShutdown) {
                    subscriber.onError(new IllegalStateException("Connection provider is shutdown."));
                }
                PooledConnectionProviderImpl.this.idleConnectionsHolder.pollThisEventLoopConnections().concatWith(PooledConnectionProviderImpl.this.connectIfAllowed()).filter(new Func1<PooledConnection<R, W>, Boolean>(){

                    @Override
                    public Boolean call(PooledConnection<R, W> c) {
                        boolean isUsable = c.isUsable();
                        if (!isUsable) {
                            PooledConnectionProviderImpl.this.discardNow(c);
                        }
                        return isUsable;
                    }
                }).take(1).lift(new ReuseSubscriberLinker()).lift(new ConnectMetricsOperator()).unsafeSubscribe(subscriber);
            }
        });
    }

    @Override
    public Observable<Void> release(PooledConnection<?, ?> connection) {
        final PooledConnection<?, ?> c = connection;
        return Observable.create(new Observable.OnSubscribe<Void>(){

            @Override
            public void call(Subscriber<? super Void> subscriber) {
                if (null == c) {
                    subscriber.onCompleted();
                } else {
                    c.unsafeNettyChannel().eventLoop().submit(new ReleaseTask(c, subscriber));
                }
            }
        });
    }

    @Override
    public Observable<Void> discard(final PooledConnection<?, ?> connection) {
        return connection.discard().doOnSubscribe(new Action0(){

            @Override
            public void call() {
                EventPublisher eventPublisher = connection.unsafeNettyChannel().attr(EventAttributeKeys.EVENT_PUBLISHER).get();
                if (eventPublisher.publishingEnabled()) {
                    ClientEventListener eventListener = connection.unsafeNettyChannel().attr(EventAttributeKeys.CLIENT_EVENT_LISTENER).get();
                    eventListener.onPooledConnectionEviction();
                }
                PooledConnectionProviderImpl.this.limitDeterminationStrategy.releasePermit();
            }
        });
    }

    private Observable<PooledConnection<R, W>> connectIfAllowed() {
        return Observable.create(new Observable.OnSubscribe<PooledConnection<R, W>>(){

            @Override
            public void call(Subscriber<? super PooledConnection<R, W>> subscriber) {
                long startTimeNanos = Clock.newStartTimeNanos();
                if (PooledConnectionProviderImpl.this.limitDeterminationStrategy.acquireCreationPermit(startTimeNanos, TimeUnit.NANOSECONDS)) {
                    Observable newConnObsv = PooledConnectionProviderImpl.this.hostConnector.getConnectionProvider().newConnectionRequest();
                    newConnObsv.map(new Func1<Connection<R, W>, PooledConnection<R, W>>(){

                        @Override
                        public PooledConnection<R, W> call(Connection<R, W> connection) {
                            return PooledConnection.create(PooledConnectionProviderImpl.this, PooledConnectionProviderImpl.this.maxIdleTimeMillis, connection);
                        }
                    }).doOnError(new Action1<Throwable>(){

                        @Override
                        public void call(Throwable throwable) {
                            PooledConnectionProviderImpl.this.limitDeterminationStrategy.releasePermit();
                        }
                    }).unsafeSubscribe(subscriber);
                } else {
                    PooledConnectionProviderImpl.this.idleConnectionsHolder.poll().switchIfEmpty(Observable.error(new PoolExhaustedException("Client connection pool exhausted."))).unsafeSubscribe(subscriber);
                }
            }
        });
    }

    private void discardNow(PooledConnection<R, W> toDiscard) {
        this.discard(toDiscard).subscribe(Actions.empty(), new Action1<Throwable>(){

            @Override
            public void call(Throwable throwable) {
                logger.error("Error discarding connection.", throwable);
            }
        });
    }

    private boolean isEventPublishingEnabled() {
        return this.hostConnector.getEventPublisher().publishingEnabled();
    }

    private static class ScalarAsyncSubscriber<R, W>
    extends Subscriber<PooledConnection<R, W>> {
        private boolean terminated;
        private Throwable error;
        private boolean onNextArrived;
        private final Subscriber<? super PooledConnection<R, W>> delegate;

        private ScalarAsyncSubscriber(Subscriber<? super PooledConnection<R, W>> delegate) {
            this.delegate = delegate;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onCompleted() {
            boolean _onNextArrived;
            ScalarAsyncSubscriber scalarAsyncSubscriber = this;
            synchronized (scalarAsyncSubscriber) {
                _onNextArrived = this.onNextArrived;
            }
            this.terminated = true;
            if (_onNextArrived) {
                this.delegate.onCompleted();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onError(Throwable e) {
            boolean _onNextArrived;
            ScalarAsyncSubscriber scalarAsyncSubscriber = this;
            synchronized (scalarAsyncSubscriber) {
                _onNextArrived = this.onNextArrived;
            }
            this.terminated = true;
            this.error = e;
            if (_onNextArrived) {
                this.delegate.onError(e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onNext(PooledConnection<R, W> conn) {
            Throwable _error;
            boolean _terminated;
            ScalarAsyncSubscriber scalarAsyncSubscriber = this;
            synchronized (scalarAsyncSubscriber) {
                this.onNextArrived = true;
                _terminated = this.terminated;
                _error = this.error;
            }
            this.delegate.onNext(conn);
            if (_terminated) {
                if (null != this.error) {
                    this.delegate.onError(_error);
                } else {
                    this.delegate.onCompleted();
                }
            }
        }
    }

    private class ReuseSubscriberLinker
    implements Observable.Operator<PooledConnection<R, W>, PooledConnection<R, W>> {
        private ScalarAsyncSubscriber<R, W> onReuseSubscriber;

        private ReuseSubscriberLinker() {
        }

        @Override
        public Subscriber<? super PooledConnection<R, W>> call(final Subscriber<? super PooledConnection<R, W>> o) {
            return new Subscriber<PooledConnection<R, W>>(o){

                @Override
                public void onCompleted() {
                    if (null != ReuseSubscriberLinker.this.onReuseSubscriber) {
                        ReuseSubscriberLinker.this.onReuseSubscriber.onCompleted();
                    } else {
                        o.onCompleted();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (null != ReuseSubscriberLinker.this.onReuseSubscriber) {
                        ReuseSubscriberLinker.this.onReuseSubscriber.onError(e);
                    } else {
                        o.onError(e);
                    }
                }

                @Override
                public void onNext(PooledConnection<R, W> c) {
                    if (c.isReused()) {
                        EventPublisher eventPublisher = c.unsafeNettyChannel().attr(EventAttributeKeys.EVENT_PUBLISHER).get();
                        if (eventPublisher.publishingEnabled()) {
                            ClientEventListener eventListener = c.unsafeNettyChannel().attr(EventAttributeKeys.CLIENT_EVENT_LISTENER).get();
                            eventListener.onPooledConnectionReuse();
                        }
                        ReuseSubscriberLinker.this.onReuseSubscriber = new ScalarAsyncSubscriber(o);
                        c.reuse(ReuseSubscriberLinker.this.onReuseSubscriber);
                    } else {
                        o.onNext(c);
                    }
                }
            };
        }
    }

    private class ConnectMetricsOperator
    implements Observable.Operator<Connection<R, W>, PooledConnection<R, W>> {
        private ConnectMetricsOperator() {
        }

        @Override
        public Subscriber<? super PooledConnection<R, W>> call(final Subscriber<? super Connection<R, W>> o) {
            long startTimeNanos;
            long l = startTimeNanos = PooledConnectionProviderImpl.this.isEventPublishingEnabled() ? Clock.newStartTimeNanos() : -1L;
            if (PooledConnectionProviderImpl.this.isEventPublishingEnabled()) {
                PooledConnectionProviderImpl.this.hostConnector.getClientPublisher().onPoolAcquireStart();
            }
            return new Subscriber<PooledConnection<R, W>>(o){

                @Override
                public void onCompleted() {
                    if (PooledConnectionProviderImpl.this.isEventPublishingEnabled()) {
                        PooledConnectionProviderImpl.this.hostConnector.getClientPublisher().onPoolAcquireSuccess(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS);
                    }
                    o.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    if (PooledConnectionProviderImpl.this.isEventPublishingEnabled()) {
                        PooledConnectionProviderImpl.this.hostConnector.getClientPublisher().onPoolAcquireFailed(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS, e);
                    }
                    o.onError(e);
                }

                @Override
                public void onNext(PooledConnection<R, W> c) {
                    o.onNext(c);
                }
            };
        }
    }

    private class ReleaseTask
    implements Runnable {
        private final PooledConnection<R, W> connection;
        private final Subscriber<? super Void> subscriber;
        private final long releaseStartTimeNanos;
        private final EventPublisher eventPublisher;
        private final ClientEventListener eventListener;

        private ReleaseTask(PooledConnection<R, W> connection, Subscriber<? super Void> subscriber) {
            this.connection = connection;
            this.subscriber = subscriber;
            this.releaseStartTimeNanos = Clock.newStartTimeNanos();
            this.eventPublisher = connection.unsafeNettyChannel().attr(EventAttributeKeys.EVENT_PUBLISHER).get();
            this.eventListener = connection.unsafeNettyChannel().attr(EventAttributeKeys.CLIENT_EVENT_LISTENER).get();
        }

        @Override
        public void run() {
            try {
                this.connection.unsafeNettyChannel().pipeline().fireUserEventTriggered(ClientConnectionToChannelBridge.PooledConnectionReleaseEvent.INSTANCE);
                if (this.eventPublisher.publishingEnabled()) {
                    this.eventListener.onPoolReleaseStart();
                }
                if (PooledConnectionProviderImpl.this.isShutdown || !this.connection.isUsable()) {
                    PooledConnectionProviderImpl.this.discardNow(this.connection);
                } else {
                    PooledConnectionProviderImpl.this.idleConnectionsHolder.add(this.connection);
                }
                if (this.eventPublisher.publishingEnabled()) {
                    this.eventListener.onPoolReleaseSuccess(Clock.onEndNanos(this.releaseStartTimeNanos), TimeUnit.NANOSECONDS);
                }
                this.subscriber.onCompleted();
            }
            catch (Throwable throwable) {
                if (this.eventPublisher.publishingEnabled()) {
                    this.eventListener.onPoolReleaseFailed(Clock.onEndNanos(this.releaseStartTimeNanos), TimeUnit.NANOSECONDS, throwable);
                }
                this.subscriber.onError(throwable);
            }
        }
    }

    private class IdleConnectionCleanupTask
    implements Func1<Long, Observable<Void>> {
        private IdleConnectionCleanupTask() {
        }

        @Override
        public Observable<Void> call(Long aLong) {
            return PooledConnectionProviderImpl.this.idleConnectionsHolder.peek().map(new Func1<PooledConnection<R, W>, Void>(){

                @Override
                public Void call(PooledConnection<R, W> connection) {
                    if (!connection.isUsable()) {
                        PooledConnectionProviderImpl.this.idleConnectionsHolder.remove(connection);
                        PooledConnectionProviderImpl.this.discardNow(connection);
                    }
                    return null;
                }
            }).ignoreElements();
        }
    }

    private static class LogErrorAction
    implements Action1<Throwable> {
        public static final LogErrorAction INSTANCE = new LogErrorAction();

        private LogErrorAction() {
        }

        @Override
        public void call(Throwable throwable) {
            logger.error("Error from idle connection cleanup timer. This will be retried.", throwable);
        }
    }
}

