/*
 * Decompiled with CFR 0.152.
 */
package io.dingodb.sdk.service.connector;

import io.dingodb.error.ErrorOuterClass;
import io.dingodb.sdk.common.DingoClientException;
import io.dingodb.sdk.common.Location;
import io.dingodb.sdk.common.utils.ErrorCodeUtils;
import io.dingodb.sdk.common.utils.NoBreakFunctions;
import io.dingodb.sdk.common.utils.Optional;
import io.dingodb.sdk.common.utils.StackTraces;
import io.dingodb.sdk.service.ChannelManager;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.stub.AbstractBlockingStub;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ServiceConnector<S extends AbstractBlockingStub<S>> {
    private static final Logger log = LoggerFactory.getLogger(ServiceConnector.class);
    public static final int RETRY_TIMES = 30;
    private static Map<Class, ResponseBuilder> responseBuilders = new ConcurrentHashMap<Class, ResponseBuilder>();
    protected final AtomicReference<S> stubRef = new AtomicReference();
    protected final Set<Location> locations = new CopyOnWriteArraySet<Location>();
    protected final int retryTimes;
    private final AtomicBoolean refresh = new AtomicBoolean();
    private boolean closed = false;

    @Deprecated
    public ServiceConnector(String locations) {
        this(locations, 30);
    }

    @Deprecated
    public ServiceConnector(Set<Location> locations) {
        this.locations.addAll(locations);
        this.retryTimes = 30;
    }

    public ServiceConnector(String locations, int retryTimes) {
        this(Optional.ofNullable(locations).map(__ -> __.split(",")).map(Arrays::stream).map(ss -> ss.map(s -> s.split(":")).map(__ -> new Location(__[0], Integer.parseInt(__[1]))).collect(Collectors.toSet())).orElseGet(Collections::emptySet), retryTimes);
    }

    public ServiceConnector(Set<Location> locations, int retryTimes) {
        this.locations.addAll(locations);
        this.retryTimes = retryTimes;
    }

    public void close() {
        this.closed = true;
    }

    public S getStub() {
        return (S)((AbstractBlockingStub)this.stubRef.get());
    }

    <R> Response<R> toResponse(Object res) {
        return responseBuilders.computeIfAbsent(res.getClass(), NoBreakFunctions.wrap(cls -> new ResponseBuilder(cls.getDeclaredMethod("getError", new Class[0])))).build(res);
    }

    protected <R> R cleanResponse(Response<R> response) {
        return (R)Optional.mapOrNull(response, Response::getResponse);
    }

    public <R> R exec(Function<S, R> function) {
        return (R)this.cleanResponse(this.exec(StackTraces.stack(2), function, 30, ErrorCodeUtils.errorToStrategyFunc, this::toResponse));
    }

    public <R> R exec(Function<S, R> function, int retryTimes) {
        return (R)this.cleanResponse(this.exec(StackTraces.stack(2), function, retryTimes, ErrorCodeUtils.errorToStrategyFunc, this::toResponse));
    }

    public <R> R exec(Function<S, R> function, Function<Integer, ErrorCodeUtils.Strategy> errChecker) {
        return (R)this.cleanResponse(this.exec(StackTraces.stack(2), function, 30, errChecker, this::toResponse));
    }

    public <R> R exec(Function<S, R> function, int retryTimes, Function<Integer, ErrorCodeUtils.Strategy> errChecker) {
        return (R)this.cleanResponse(this.exec(StackTraces.stack(2), function, retryTimes, errChecker, this::toResponse));
    }

    public <R> Response<R> exec(Function<S, R> function, int retryTimes, Function<Integer, ErrorCodeUtils.Strategy> errChecker, Function<R, Response<R>> toResponse) {
        return this.exec(StackTraces.stack(2), function, retryTimes, errChecker, toResponse);
    }

    public <R> R exec(String name, Function<S, R> task, int retryTimes, Function<Integer, ErrorCodeUtils.Strategy> errChecker) {
        return (R)this.cleanResponse(this.exec(name, task, retryTimes, errChecker, this::toResponse));
    }

    public <R> Response<R> exec(String name, Function<S, R> task, int retryTimes, Function<Integer, ErrorCodeUtils.Strategy> errChecker, Function<R, Response<R>> toResponse) {
        if (this.closed) {
            throw new DingoClientException(-1, "The connector is closed, please check status.");
        }
        S stub = null;
        boolean connected = false;
        boolean specialRetry = true;
        HashMap<String, Integer> errMsgs = new HashMap<String, Integer>();
        block8: while (retryTimes-- > 0) {
            try {
                S s = this.getStub();
                stub = s;
                if (s == null) {
                    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
                    this.refresh(stub);
                    continue;
                }
                connected = true;
                Response<R> response = toResponse.apply(task.apply(stub));
                ErrorOuterClass.Error error = response.getError();
                int errCode = error.getErrcodeValue();
                if (errCode != 0) {
                    String authority = Optional.mapOrGet(stub.getChannel(), Channel::authority, () -> "");
                    errMsgs.compute(authority + ">>" + error.getErrmsg(), (k, v) -> v == null ? 1 : v + 1);
                    switch (errChecker.apply(errCode)) {
                        case RETRY: {
                            this.errorLog(name, authority, error, ErrorCodeUtils.Strategy.RETRY);
                            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
                            if (errCode == 10110 && specialRetry) {
                                retryTimes = 3600;
                                specialRetry = false;
                            }
                            this.refresh(stub);
                            continue block8;
                        }
                        case FAILED: {
                            this.errorLog(name, authority, error, ErrorCodeUtils.Strategy.FAILED);
                            throw new DingoClientException.RequestErrorException(errCode, error.getErrmsg());
                        }
                        case REFRESH: {
                            this.errorLog(name, authority, error, ErrorCodeUtils.Strategy.REFRESH);
                            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
                            this.refresh(stub);
                            throw new DingoClientException.InvalidRouteTableException(response.error.getErrmsg());
                        }
                        case IGNORE: {
                            this.errorLog(name, authority, error, ErrorCodeUtils.Strategy.IGNORE);
                            return null;
                        }
                    }
                    throw new IllegalStateException("Unexpected value: " + String.valueOf((Object)errChecker.apply(errCode)));
                }
                return response;
            }
            catch (Exception e) {
                if (e instanceof DingoClientException.RequestErrorException || e instanceof DingoClientException.InvalidRouteTableException) {
                    throw e;
                }
                if (log.isDebugEnabled()) {
                    log.warn("Exec {} failed: {}.", (Object)name, (Object)e.getMessage());
                }
                errMsgs.compute(e.getMessage(), (k, v) -> v == null ? 1 : v + 1);
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
                this.refresh(stub);
            }
        }
        throw this.generateException(name, connected, errMsgs);
    }

    private <R> RuntimeException generateException(String name, boolean connected, Map<String, Integer> errMsgs) {
        if (connected) {
            StringBuilder errMsgBuilder = new StringBuilder();
            errMsgBuilder.append("task: ").append(name).append("==>>");
            errMsgs.forEach((k, v) -> errMsgBuilder.append('[').append(v).append("] times [").append((String)k).append(']').append(", "));
            throw new DingoClientException.ExhaustedRetryException("Exec attempts exhausted, failed to exec " + name + ", " + String.valueOf(errMsgBuilder));
        }
        throw new DingoClientException.ExhaustedRetryException("Exec " + name + " error, transform leader attempts exhausted.");
    }

    private void errorLog(String name, String remote, ErrorOuterClass.Error error, ErrorCodeUtils.Strategy strategy) {
        if (log.isDebugEnabled()) {
            log.warn("Exec {} failed, remote: [{}], code: [{}], message: {}, strategy: {}.", new Object[]{name, remote, error.getErrcode(), error.getErrmsg(), strategy});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void refresh(S stub) {
        if (!this.refresh.compareAndSet(false, true)) {
            return;
        }
        try {
            if (!this.stubRef.compareAndSet(stub, null)) {
                return;
            }
            if (this.locations.isEmpty()) {
                Optional.ofNullable(this.transformToLeaderChannel(null)).map(this::newStub).ifPresent(this.stubRef::set);
                return;
            }
            for (Location location : this.locations) {
                if (!Optional.of(location).map(this::newChannel).map(NoBreakFunctions.wrap(this::transformToLeaderChannel)).map(this::newStub).ifPresent(this.stubRef::set).isPresent()) continue;
                return;
            }
        }
        catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.warn("Get connection stub failed, will retry...");
            }
        }
        finally {
            this.refresh.set(false);
        }
    }

    protected ManagedChannel newChannel(Location location) {
        try {
            return ChannelManager.getChannel(location);
        }
        catch (Exception e) {
            log.warn("Connect {} error", (Object)location, (Object)e);
            return null;
        }
    }

    protected ManagedChannel newChannel(String host, int port) {
        return this.newChannel(new Location(host, port));
    }

    protected abstract ManagedChannel transformToLeaderChannel(ManagedChannel var1);

    protected abstract S newStub(ManagedChannel var1);

    private static class ResponseBuilder<R> {
        private final Method errorGetter;

        public Response<R> build(R response) {
            try {
                return new Response<R>((ErrorOuterClass.Error)this.errorGetter.invoke(response, new Object[0]), response);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public ResponseBuilder(Method errorGetter) {
            this.errorGetter = errorGetter;
        }
    }

    public static class Response<R> {
        public final ErrorOuterClass.Error error;
        public final R response;

        public ErrorOuterClass.Error getError() {
            return this.error;
        }

        public R getResponse() {
            return this.response;
        }

        public Response(ErrorOuterClass.Error error, R response) {
            this.error = error;
            this.response = response;
        }
    }
}

