/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.client.pool;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutorGroup;
import io.reactivex.netty.channel.AllocatingTransformer;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.client.ClientConnectionToChannelBridge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Actions;
import rx.functions.Func1;

public class PooledConnection<R, W>
extends Connection<R, W> {
    private static final Logger logger = LoggerFactory.getLogger(PooledConnection.class);
    public static final AttributeKey<Long> DYNAMIC_CONN_KEEP_ALIVE_TIMEOUT_MS = AttributeKey.valueOf("rxnetty_conn_keep_alive_timeout_millis");
    private final Owner owner;
    private final Connection<R, W> unpooledDelegate;
    private volatile long lastReturnToPoolTimeMillis;
    private volatile boolean releasedAtLeastOnce;
    private volatile long maxIdleTimeMillis;
    private final Observable<Void> releaseObservable;

    private PooledConnection(Owner owner, long maxIdleTimeMillis, Connection<R, W> unpooledDelegate) {
        super(unpooledDelegate);
        if (null == owner) {
            throw new IllegalArgumentException("Pooled connection owner can not be null");
        }
        if (null == unpooledDelegate) {
            throw new IllegalArgumentException("Connection delegate can not be null");
        }
        this.owner = owner;
        this.unpooledDelegate = unpooledDelegate;
        this.maxIdleTimeMillis = maxIdleTimeMillis;
        this.lastReturnToPoolTimeMillis = System.currentTimeMillis();
        this.releaseObservable = Observable.create(new Observable.OnSubscribe<Void>(){

            @Override
            public void call(Subscriber<? super Void> subscriber) {
                if (!PooledConnection.this.isUsable()) {
                    PooledConnection.this.owner.discard(PooledConnection.this).unsafeSubscribe(subscriber);
                } else {
                    Long keepAliveTimeout = PooledConnection.this.unsafeNettyChannel().attr(DYNAMIC_CONN_KEEP_ALIVE_TIMEOUT_MS).get();
                    if (null != keepAliveTimeout) {
                        PooledConnection.this.maxIdleTimeMillis = keepAliveTimeout;
                    }
                    PooledConnection.this.markAwarePipeline.reset();
                    PooledConnection.this.owner.release(PooledConnection.this).doOnCompleted(new Action0(){

                        @Override
                        public void call() {
                            PooledConnection.this.releasedAtLeastOnce = true;
                            PooledConnection.this.lastReturnToPoolTimeMillis = System.currentTimeMillis();
                        }
                    }).unsafeSubscribe(subscriber);
                }
            }
        }).onErrorResumeNext(this.discard());
    }

    private PooledConnection(PooledConnection<?, ?> toCopy, Connection<R, W> unpooledDelegate) {
        super(unpooledDelegate);
        this.owner = toCopy.owner;
        this.unpooledDelegate = unpooledDelegate;
        this.lastReturnToPoolTimeMillis = toCopy.lastReturnToPoolTimeMillis;
        this.releasedAtLeastOnce = toCopy.releasedAtLeastOnce;
        this.maxIdleTimeMillis = toCopy.maxIdleTimeMillis;
        this.releaseObservable = toCopy.releaseObservable;
    }

    @Override
    public Observable<Void> write(Observable<W> msgs) {
        return this.unpooledDelegate.write(msgs);
    }

    @Override
    public Observable<Void> write(Observable<W> msgs, Func1<W, Boolean> flushSelector) {
        return this.unpooledDelegate.write(msgs, flushSelector);
    }

    @Override
    public Observable<Void> writeAndFlushOnEach(Observable<W> msgs) {
        return this.unpooledDelegate.writeAndFlushOnEach(msgs);
    }

    @Override
    public Observable<Void> writeString(Observable<String> msgs) {
        return this.unpooledDelegate.writeString(msgs);
    }

    @Override
    public Observable<Void> writeString(Observable<String> msgs, Func1<String, Boolean> flushSelector) {
        return this.unpooledDelegate.writeString(msgs, flushSelector);
    }

    @Override
    public Observable<Void> writeStringAndFlushOnEach(Observable<String> msgs) {
        return this.unpooledDelegate.writeStringAndFlushOnEach(msgs);
    }

    @Override
    public Observable<Void> writeBytes(Observable<byte[]> msgs) {
        return this.unpooledDelegate.writeBytes(msgs);
    }

    @Override
    public Observable<Void> writeBytes(Observable<byte[]> msgs, Func1<byte[], Boolean> flushSelector) {
        return this.unpooledDelegate.writeBytes(msgs, flushSelector);
    }

    @Override
    public Observable<Void> writeBytesAndFlushOnEach(Observable<byte[]> msgs) {
        return this.unpooledDelegate.writeBytesAndFlushOnEach(msgs);
    }

    @Override
    public Observable<Void> writeFileRegion(Observable<FileRegion> msgs) {
        return this.unpooledDelegate.writeFileRegion(msgs);
    }

    @Override
    public Observable<Void> writeFileRegion(Observable<FileRegion> msgs, Func1<FileRegion, Boolean> flushSelector) {
        return this.unpooledDelegate.writeFileRegion(msgs, flushSelector);
    }

    @Override
    public Observable<Void> writeFileRegionAndFlushOnEach(Observable<FileRegion> msgs) {
        return this.unpooledDelegate.writeFileRegionAndFlushOnEach(msgs);
    }

    @Override
    public void flush() {
        this.unpooledDelegate.flush();
    }

    @Override
    public Observable<Void> close() {
        return this.close(true);
    }

    @Override
    public Observable<Void> close(boolean flush) {
        if (flush) {
            return this.releaseObservable.doOnSubscribe(new Action0(){

                @Override
                public void call() {
                    PooledConnection.this.unpooledDelegate.flush();
                }
            });
        }
        return this.releaseObservable;
    }

    @Override
    public void closeNow() {
        this.close().subscribe(Actions.empty(), new Action1<Throwable>(){

            @Override
            public void call(Throwable throwable) {
                logger.error("Error closing connection.", throwable);
            }
        });
    }

    @Override
    public Observable<Void> closeListener() {
        return this.unpooledDelegate.closeListener();
    }

    @Override
    public <RR, WW> Connection<RR, WW> addChannelHandlerAfter(String baseName, String name, ChannelHandler handler) {
        return new PooledConnection(this, this.unpooledDelegate.addChannelHandlerAfter(baseName, name, handler));
    }

    @Override
    public <RR, WW> Connection<RR, WW> addChannelHandlerAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
        return new PooledConnection(this, this.unpooledDelegate.addChannelHandlerAfter(group, baseName, name, handler));
    }

    @Override
    public <RR, WW> Connection<RR, WW> addChannelHandlerBefore(String baseName, String name, ChannelHandler handler) {
        return new PooledConnection(this, this.unpooledDelegate.addChannelHandlerBefore(baseName, name, handler));
    }

    @Override
    public <RR, WW> Connection<RR, WW> addChannelHandlerBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
        return new PooledConnection(this, this.unpooledDelegate.addChannelHandlerBefore(group, baseName, name, handler));
    }

    @Override
    public <RR, WW> Connection<RR, WW> addChannelHandlerFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new PooledConnection(this, this.unpooledDelegate.addChannelHandlerFirst(group, name, handler));
    }

    @Override
    public <RR, WW> Connection<RR, WW> addChannelHandlerFirst(String name, ChannelHandler handler) {
        return new PooledConnection(this, this.unpooledDelegate.addChannelHandlerFirst(name, handler));
    }

    @Override
    public <RR, WW> Connection<RR, WW> addChannelHandlerLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new PooledConnection(this, this.unpooledDelegate.addChannelHandlerLast(group, name, handler));
    }

    @Override
    public <RR, WW> Connection<RR, WW> addChannelHandlerLast(String name, ChannelHandler handler) {
        return new PooledConnection(this, this.unpooledDelegate.addChannelHandlerLast(name, handler));
    }

    @Override
    public <RR, WW> Connection<RR, WW> pipelineConfigurator(Action1<ChannelPipeline> pipelineConfigurator) {
        return new PooledConnection(this, this.unpooledDelegate.pipelineConfigurator(pipelineConfigurator));
    }

    @Override
    public <RR> Connection<RR, W> transformRead(Observable.Transformer<R, RR> transformer) {
        return new PooledConnection<RR, W>(this, this.unpooledDelegate.transformRead(transformer));
    }

    @Override
    public <WW> Connection<R, WW> transformWrite(AllocatingTransformer<WW, W> transformer) {
        return new PooledConnection<R, W>(this, this.unpooledDelegate.transformWrite((AllocatingTransformer)transformer));
    }

    Observable<Void> discard() {
        return this.unpooledDelegate.close();
    }

    public boolean isUsable() {
        Channel nettyChannel = this.unsafeNettyChannel();
        Boolean discardConn = nettyChannel.attr(ClientConnectionToChannelBridge.DISCARD_CONNECTION).get();
        if (!nettyChannel.isActive() || Boolean.TRUE == discardConn) {
            return false;
        }
        long nowMillis = System.currentTimeMillis();
        long idleTime = nowMillis - this.lastReturnToPoolTimeMillis;
        return idleTime < this.maxIdleTimeMillis;
    }

    public void reuse(Subscriber<? super PooledConnection<R, W>> connectionSubscriber) {
        this.unsafeNettyChannel().pipeline().fireUserEventTriggered(new ClientConnectionToChannelBridge.ConnectionReuseEvent(connectionSubscriber, this));
    }

    public static <R, W> PooledConnection<R, W> create(Owner owner, long maxIdleTimeMillis, Connection<R, W> unpooledDelegate) {
        PooledConnection<R, W> toReturn = new PooledConnection<R, W>(owner, maxIdleTimeMillis, unpooledDelegate);
        toReturn.connectCloseToChannelClose();
        return toReturn;
    }

    public boolean isReused() {
        return this.releasedAtLeastOnce;
    }

    @Override
    public ChannelPipeline getChannelPipeline() {
        return this.markAwarePipeline;
    }

    void setLastReturnToPoolTimeMillis(long lastReturnToPoolTimeMillis) {
        this.lastReturnToPoolTimeMillis = lastReturnToPoolTimeMillis;
    }

    public static interface Owner {
        public Observable<Void> release(PooledConnection<?, ?> var1);

        public Observable<Void> discard(PooledConnection<?, ?> var1);
    }
}

