/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.client.loadbalancer;

import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.client.ConnectionProvider;
import io.reactivex.netty.client.ConnectionProviderFactory;
import io.reactivex.netty.client.HostConnector;
import io.reactivex.netty.client.loadbalancer.HostCollector;
import io.reactivex.netty.client.loadbalancer.HostHolder;
import io.reactivex.netty.client.loadbalancer.LoadBalancingStrategy;
import io.reactivex.netty.client.loadbalancer.NoBufferHostCollector;
import io.reactivex.netty.client.loadbalancer.NoHostsAvailableException;
import io.reactivex.netty.internal.VoidToAnythingCast;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Single;
import rx.functions.Action1;
import rx.functions.Func1;

public class LoadBalancerFactory<W, R>
implements ConnectionProviderFactory<W, R> {
    private static final Logger logger = LoggerFactory.getLogger(LoadBalancerFactory.class);
    private final LoadBalancingStrategy<W, R> strategy;
    private final HostCollector collector;

    private LoadBalancerFactory(LoadBalancingStrategy<W, R> strategy, HostCollector collector) {
        this.strategy = strategy;
        this.collector = collector;
    }

    @Override
    public ConnectionProvider<W, R> newProvider(Observable<HostConnector<W, R>> hosts) {
        return new ConnectionProviderImpl(hosts.map(new Func1<HostConnector<W, R>, HostHolder<W, R>>(){

            @Override
            public HostHolder<W, R> call(HostConnector<W, R> connector) {
                HostHolder newHolder = LoadBalancerFactory.this.strategy.toHolder(connector);
                connector.subscribe(newHolder.getEventListener());
                return newHolder;
            }
        }).flatMap(new Func1<HostHolder<W, R>, Observable<HostCollector.HostUpdate<W, R>>>(){

            @Override
            public Observable<HostCollector.HostUpdate<W, R>> call(HostHolder<W, R> holder) {
                return holder.getConnector().getHost().getCloseNotifier().map(new VoidToAnythingCast()).ignoreElements().onErrorResumeNext(Observable.empty()).concatWith(Observable.just(new HostCollector.HostUpdate(HostCollector.HostUpdate.Action.Remove, holder))).mergeWith(Observable.just(new HostCollector.HostUpdate(HostCollector.HostUpdate.Action.Add, holder)));
            }
        }).flatMap(this.newCollector(this.collector.newCollector()), 1).distinctUntilChanged());
    }

    public static <WW, RR> LoadBalancerFactory<WW, RR> create(LoadBalancingStrategy<WW, RR> strategy) {
        return LoadBalancerFactory.create(strategy, new NoBufferHostCollector());
    }

    public static <WW, RR> LoadBalancerFactory<WW, RR> create(LoadBalancingStrategy<WW, RR> strategy, HostCollector collector) {
        return new LoadBalancerFactory<WW, RR>(strategy, collector);
    }

    private Func1<? super HostCollector.HostUpdate<W, R>, ? extends Observable<List<HostHolder<W, R>>>> newCollector(final Func1<HostCollector.HostUpdate<W, R>, Single<List<HostHolder<W, R>>>> f) {
        return new Func1<HostCollector.HostUpdate<W, R>, Observable<List<HostHolder<W, R>>>>(){

            @Override
            public Observable<List<HostHolder<W, R>>> call(HostCollector.HostUpdate<W, R> holder) {
                return ((Single)f.call(holder)).toObservable();
            }
        };
    }

    private class ConnectionProviderImpl
    implements ConnectionProvider<W, R> {
        private volatile ConnectionProvider<W, R> currentProvider = new ConnectionProvider<W, R>(){

            @Override
            public Observable<Connection<R, W>> newConnectionRequest() {
                return Observable.error(NoHostsAvailableException.EMPTY_INSTANCE);
            }
        };

        public ConnectionProviderImpl(Observable<List<HostHolder<W, R>>> hosts) {
            hosts.subscribe(new Action1<List<HostHolder<W, R>>>(){

                @Override
                public void call(List<HostHolder<W, R>> hostHolders) {
                    ConnectionProviderImpl.this.currentProvider = LoadBalancerFactory.this.strategy.newStrategy(hostHolders);
                }
            }, new Action1<Throwable>(){

                @Override
                public void call(Throwable throwable) {
                    logger.error("Error while listening on the host stream. Hosts will not be refreshed.", throwable);
                }
            });
        }

        @Override
        public Observable<Connection<R, W>> newConnectionRequest() {
            return this.currentProvider.newConnectionRequest();
        }
    }
}

