/*
 * Decompiled with CFR 0.152.
 */
package io.dingodb.sdk.service.connector;

import io.dingodb.coordinator.Coordinator;
import io.dingodb.error.ErrorOuterClass;
import io.dingodb.sdk.common.DingoClientException;
import io.dingodb.sdk.common.Location;
import io.dingodb.sdk.common.utils.ErrorCodeUtils;
import io.dingodb.sdk.common.utils.Optional;
import io.dingodb.sdk.common.utils.Parameters;
import io.dingodb.sdk.service.connector.CoordinatorServiceConnector;
import io.dingodb.sdk.service.connector.ServiceConnector;
import io.dingodb.version.Version;
import io.dingodb.version.VersionServiceGrpc;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class VersionServiceConnector
extends ServiceConnector<VersionServiceGrpc.VersionServiceBlockingStub> {
    private final ScheduledExecutorService executors = Executors.newScheduledThreadPool(1);
    private final CoordinatorServiceConnector coordinatorServiceConnector;
    public final int leaseTtl;
    private final AtomicLong lease = new AtomicLong(-1L);

    public VersionServiceConnector(String locations, int leaseTtl) {
        super(Collections.emptySet());
        Parameters.check(Integer.valueOf(leaseTtl), ttl -> ttl >= 1, "Lease ttl must great than 1.");
        this.coordinatorServiceConnector = new CoordinatorServiceConnector(locations);
        this.leaseTtl = leaseTtl;
        this.initRenew();
    }

    public VersionServiceConnector(Set<Location> locations, int leaseTtl) {
        super(Collections.emptySet());
        Parameters.check(Integer.valueOf(leaseTtl), ttl -> ttl >= 1, "Lease ttl must great than 1.");
        this.coordinatorServiceConnector = new CoordinatorServiceConnector(locations);
        this.leaseTtl = leaseTtl;
        this.initRenew();
    }

    private void initRenew() {
        int delay = Math.max(Math.abs(this.leaseTtl * 1000) / 3, 1000);
        this.executors.scheduleWithFixedDelay(this::renewLease, this.leaseTtl, delay, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() {
        super.close();
        this.executors.shutdown();
    }

    public long getLease() {
        return this.lease.get();
    }

    @Override
    public <R> ServiceConnector.Response<R> exec(Function<VersionServiceGrpc.VersionServiceBlockingStub, R> function, int retryTimes, Function<Integer, ErrorCodeUtils.Strategy> errChecker, Function<R, ServiceConnector.Response<R>> toResponse) {
        return super.exec(function, this.leaseTtl, (Integer __) -> ErrorCodeUtils.Strategy.RETRY, toResponse);
    }

    @Override
    protected ManagedChannel transformToLeaderChannel(ManagedChannel channel) {
        return Optional.ofNullable(this.coordinatorServiceConnector.getCoordinatorMap()).map(Coordinator.GetCoordinatorMapResponse::getKvLeaderLocation).filter(__ -> !__.getHost().isEmpty()).map(__ -> this.newChannel(__.getHost(), __.getPort())).orNull();
    }

    @Override
    protected VersionServiceGrpc.VersionServiceBlockingStub newStub(ManagedChannel channel) {
        VersionServiceGrpc.VersionServiceBlockingStub stub = VersionServiceGrpc.newBlockingStub((Channel)channel);
        if (stub.leaseQuery(Version.LeaseQueryRequest.newBuilder().setID(this.lease.get()).build()).getError().getErrcode() == ErrorOuterClass.Errno.OK) {
            return stub;
        }
        Version.LeaseGrantResponse response = stub.leaseGrant(Version.LeaseGrantRequest.newBuilder().setID(System.currentTimeMillis()).setTTL(this.leaseTtl).build());
        if (response.getError().getErrcode().getNumber() == 0) {
            this.lease.compareAndSet(-1L, response.getID());
            return stub;
        }
        throw new DingoClientException(response.getError().getErrcode().getNumber(), response.getError().getErrmsg());
    }

    private void renewLease() {
        this.exec("renew-lease", stub -> stub.leaseRenew(Version.LeaseRenewRequest.newBuilder().setID(this.getLease()).build()), Integer.MAX_VALUE, __ -> ErrorCodeUtils.Strategy.RETRY, this::toResponse);
    }

    public CoordinatorServiceConnector getCoordinatorServiceConnector() {
        return this.coordinatorServiceConnector;
    }
}

