/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.utils;

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.transport.api.TransportObserver;
import java.util.HashMap;
import java.util.Map;
import java.util.function.ToIntFunction;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class CacheConnectionFactory<ResolvedAddress, C extends ListenableAsyncCloseable>
extends DelegatingConnectionFactory<ResolvedAddress, C> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CacheConnectionFactory.class);
    private final Map<ResolvedAddress, Item<C>> map = new HashMap<ResolvedAddress, Item<C>>();
    private final ToIntFunction<ResolvedAddress> maxConcurrencyFunc;

    CacheConnectionFactory(ConnectionFactory<ResolvedAddress, C> delegate, ToIntFunction<ResolvedAddress> maxConcurrencyFunc) {
        super(delegate);
        this.maxConcurrencyFunc = maxConcurrencyFunc;
    }

    @Override
    @Deprecated
    public Single<C> newConnection(ResolvedAddress resolvedAddress, @Nullable TransportObserver observer) {
        return this.newConnection(resolvedAddress, null, observer);
    }

    @Override
    public Single<C> newConnection(final ResolvedAddress resolvedAddress, @Nullable ContextMap context, @Nullable TransportObserver observer) {
        return Single.defer(() -> {
            Single<Object> result;
            int maxConcurrency = this.maxConcurrencyFunc.applyAsInt(resolvedAddress);
            if (maxConcurrency <= 1) {
                return this.delegate().newConnection(resolvedAddress, context, observer);
            }
            Map<ResolvedAddress, Item<C>> map = this.map;
            synchronized (map) {
                Item<C> item1 = this.map.get(resolvedAddress);
                if (item1 == null || (result = item1.addSubscriber(maxConcurrency)) == null) {
                    final Item item2 = new Item();
                    this.map.put(resolvedAddress, item2);
                    item2.single = this.delegate().newConnection(resolvedAddress, context, observer).liftSync(subscriber -> new SingleSource.Subscriber<C>(){

                        @Override
                        public void onSubscribe(Cancellable cancellable) {
                            subscriber.onSubscribe(() -> {
                                try {
                                    assert (Thread.holdsLock(CacheConnectionFactory.this.map));
                                    CacheConnectionFactory.this.map.remove(resolvedAddress, item2);
                                }
                                finally {
                                    cancellable.cancel();
                                }
                            });
                        }

                        @Override
                        public void onSuccess(@Nullable C result1) {
                            try {
                                if (result1 == null) {
                                    this.lockRemoveFromMap();
                                } else {
                                    result1.onClosing().whenFinally(this::lockRemoveFromMap).subscribe();
                                }
                            }
                            catch (Throwable cause) {
                                if (result1 != null) {
                                    LOGGER.debug("Unexpected error, closing connection='{}'", result1, (Object)cause);
                                    result1.closeAsync().subscribe();
                                }
                                subscriber.onError(cause);
                                return;
                            }
                            subscriber.onSuccess(result1);
                        }

                        @Override
                        public void onError(Throwable t) {
                            this.lockRemoveFromMap();
                            subscriber.onError(t);
                        }

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        private void lockRemoveFromMap() {
                            Map map = CacheConnectionFactory.this.map;
                            synchronized (map) {
                                CacheConnectionFactory.this.map.remove(resolvedAddress, item2);
                            }
                        }
                    }).cache().liftSync(subscriber -> new SingleSource.Subscriber<C>(){

                        @Override
                        public void onSubscribe(Cancellable cancellable) {
                            subscriber.onSubscribe(() -> {
                                Map map = CacheConnectionFactory.this.map;
                                synchronized (map) {
                                    cancellable.cancel();
                                }
                            });
                        }

                        @Override
                        public void onSuccess(@Nullable C result1) {
                            try {
                                subscriber.onSuccess(result1);
                            }
                            finally {
                                this.lockRemoveFromMap();
                            }
                        }

                        @Override
                        public void onError(Throwable t) {
                            try {
                                subscriber.onError(t);
                            }
                            finally {
                                this.lockRemoveFromMap();
                            }
                        }

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        private void lockRemoveFromMap() {
                            Map map = CacheConnectionFactory.this.map;
                            synchronized (map) {
                                CacheConnectionFactory.this.map.remove(resolvedAddress, item2);
                            }
                        }
                    });
                    result = item2.single;
                }
            }
            return result.shareContextOnSubscribe();
        });
    }

    private static final class Item<C> {
        @Nullable
        Single<C> single;
        private int subscriberCount = 1;

        private Item() {
        }

        @Nullable
        Single<C> addSubscriber(int maxSubscriberCount) {
            if (this.subscriberCount >= maxSubscriberCount) {
                return null;
            }
            ++this.subscriberCount;
            return this.single;
        }
    }
}

