/*
 * Decompiled with CFR 0.152.
 */
package io.dingodb.sdk.service;

import io.dingodb.sdk.common.DingoClientException;
import io.dingodb.sdk.common.utils.Future;
import io.dingodb.sdk.service.MetaService;
import io.dingodb.sdk.service.Services;
import io.dingodb.sdk.service.WatchService;
import io.dingodb.sdk.service.entity.common.KeyValue;
import io.dingodb.sdk.service.entity.meta.TsoOpType;
import io.dingodb.sdk.service.entity.meta.TsoRequest;
import io.dingodb.sdk.service.entity.meta.TsoTimestamp;
import io.dingodb.sdk.service.entity.version.DeleteRangeRequest;
import io.dingodb.sdk.service.entity.version.Event;
import io.dingodb.sdk.service.entity.version.EventFilterType;
import io.dingodb.sdk.service.entity.version.EventType;
import io.dingodb.sdk.service.entity.version.Kv;
import io.dingodb.sdk.service.entity.version.LeaseGrantRequest;
import io.dingodb.sdk.service.entity.version.LeaseRenewRequest;
import io.dingodb.sdk.service.entity.version.PutRequest;
import io.dingodb.sdk.service.entity.version.PutResponse;
import io.dingodb.sdk.service.entity.version.RangeRequest;
import io.dingodb.sdk.service.entity.version.RangeResponse;
import io.dingodb.sdk.service.entity.version.WatchRequest;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Function;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LockService
extends WatchService {
    private static final Logger log = LoggerFactory.getLogger(LockService.class);
    private final ScheduledExecutorService executors = Executors.newScheduledThreadPool(4);
    private final MetaService tsoService;
    private ScheduledFuture<?> renewFuture;
    private ScheduledFuture<?> expireFuture;
    public final long leaseTtl;
    private volatile long lease = -1L;
    private final int delay;
    public final String resource;
    public volatile long ttlRefreshTime;
    public final String resourcePrefixBegin;
    public final String resourcePrefixEnd;
    private String resourcePrefixKeyBegin;
    private String resourcePrefixKeyEnd;
    public List<Lock> ownerLockList = new ArrayList<Lock>();
    private static final ThreadFactory threadFactory = new ThreadFactory(){
        private final AtomicInteger index = new AtomicInteger(0);

        @Override
        public Thread newThread(@NonNull Runnable runnable) {
            String threadName = String.format("%s-thread-%d", "Lock_watch", this.index.incrementAndGet());
            ThreadGroup group = Thread.currentThread().getThreadGroup();
            Thread thread = new Thread(group, runnable, threadName);
            thread.setDaemon(true);
            thread.setPriority(5);
            return thread;
        }
    };
    public static final ThreadPoolExecutor LOCK_FUTURE_POOL = new ThreadPoolExecutor(5, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.AbortPolicy());

    public LockService(String servers) {
        this(servers, 30);
    }

    public LockService(String resource, String servers) {
        this(resource, servers, 30);
    }

    public LockService(String servers, int leaseTtl) {
        this(UUID.randomUUID().toString(), servers, leaseTtl);
    }

    public LockService(String resource, String servers, int leaseTtl) {
        super(servers);
        this.tsoService = Services.tsoService(this.locations);
        this.resource = resource;
        this.leaseTtl = leaseTtl;
        this.resourcePrefixBegin = resource + "|0|";
        this.resourcePrefixEnd = resource + "|1|";
        this.delay = Math.max(Math.abs(leaseTtl * 1000) / 3, 1000);
        this.executors.execute(this::grantLease);
    }

    private synchronized void grantLease() {
        long grantLeaseStart = System.currentTimeMillis();
        do {
            try {
                long ts = this.lease;
                if (ts == -1L) {
                    TsoTimestamp tso = this.tsoService.tsoService((TsoRequest)((TsoRequest.TsoRequestBuilder)((TsoRequest.TsoRequestBuilder)TsoRequest.builder().count(1L)).opType(TsoOpType.OP_GEN_TSO)).build()).getStartTimestamp();
                    ts = (tso.getPhysical() << 18) + tso.getLogical();
                }
                this.lease = this.kvService.leaseGrant((LeaseGrantRequest)((LeaseGrantRequest.LeaseGrantRequestBuilder)((LeaseGrantRequest.LeaseGrantRequestBuilder)LeaseGrantRequest.builder().iD(ts)).tTL(this.leaseTtl)).build()).getID();
            }
            catch (Exception e) {
                if (this.lease == -1L) {
                    log.error("Grant lease failed, will retry...", (Throwable)e);
                    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
                    continue;
                }
                log.error("Grant lease again failed.", (Throwable)e);
            }
        } while (this.lease == -1L);
        log.info("grantLease done, lease:{}, resource:{}, cost:{}", new Object[]{this.lease, this.resource, System.currentTimeMillis() - grantLeaseStart});
        this.resourcePrefixKeyBegin = this.resourcePrefixBegin + this.lease() + "|0|";
        this.resourcePrefixKeyEnd = this.resourcePrefixBegin + this.lease() + "|1|";
        if (this.renewFuture == null) {
            this.renewFuture = this.executors.scheduleWithFixedDelay(this::renewLease, this.delay, this.delay, TimeUnit.MILLISECONDS);
            this.expireFuture = this.executors.scheduleWithFixedDelay(this::expireCheck, this.delay, 1000L, TimeUnit.MILLISECONDS);
        }
    }

    private void expireCheck() {
        long sub;
        if (this.ttlRefreshTime > 0L && (sub = System.currentTimeMillis() - this.ttlRefreshTime) / 1000L > this.leaseTtl && !this.ownerLockList.isEmpty()) {
            log.info("expire lease ttl. resource:" + this.resource);
            this.ownerLockList.forEach(lock -> {
                if (lock.locked()) {
                    lock.destroy();
                }
            });
            this.ownerLockList.clear();
        }
    }

    private void renewLease() {
        if (this.lease == -1L) {
            return;
        }
        long start = System.currentTimeMillis();
        try {
            this.kvService.leaseRenew((LeaseRenewRequest)((LeaseRenewRequest.LeaseRenewRequestBuilder)LeaseRenewRequest.builder().iD(this.lease())).build());
            this.ttlRefreshTime = System.currentTimeMillis();
            long sub = System.currentTimeMillis() - start;
            if (sub / 1000L > this.leaseTtl) {
                log.error("renew rpc cost:{}, resource:{}", (Object)sub, (Object)this.resource);
            }
        }
        catch (Exception e) {
            log.error("Renew lease {} error, cost:{}, resource:{}, grant again.", new Object[]{this.lease, System.currentTimeMillis() - start, this.resource, e});
            this.grantLease();
        }
    }

    public long lease() {
        while (this.lease == -1L) {
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
        }
        return this.lease;
    }

    public String getResourcePrefixKeyBegin() {
        while (this.resourcePrefixKeyBegin == null) {
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
        }
        return this.resourcePrefixKeyBegin;
    }

    public String getResourcePrefixKeyEnd() {
        while (this.resourcePrefixKeyEnd == null) {
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
        }
        return this.resourcePrefixKeyEnd;
    }

    public List<Kv> listLock() {
        return this.kvService.kvRange(this.rangeRequest()).getKvs();
    }

    public Kv currentLock() {
        return this.listLock().stream().filter(Objects::nonNull).min(Comparator.comparingLong(Kv::getCreateRevision)).orElse(null);
    }

    public void close() {
        try {
            this.kvService.kvDeleteRange(this.deleteAllRangeRequest());
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public void cancel() {
        try {
            if (this.renewFuture != null && !this.renewFuture.isCancelled()) {
                this.renewFuture.cancel(true);
            }
            if (this.expireFuture != null && !this.expireFuture.isCancelled()) {
                this.expireFuture.cancel(true);
            }
        }
        catch (Exception e) {
            log.error("cancel lock service error", (Throwable)e);
        }
    }

    public Kv put(long ts, String key, String value) {
        PutRequest request = this.putRequest(key, value);
        PutResponse putResponse = this.kvService.kvPut(ts, request);
        long createRevision = putResponse.getHeader().getRevision();
        long modRevision = putResponse.getHeader().getRevision();
        if (putResponse.getPrevKv() != null) {
            createRevision = putResponse.getPrevKv().getCreateRevision();
        }
        return ((Kv.KvBuilder)((Kv.KvBuilder)((Kv.KvBuilder)Kv.builder().kv(request.getKeyValue())).createRevision(createRevision)).modRevision(modRevision)).build();
    }

    public void delete(long ts, String key) {
        this.kvService.kvDeleteRange(ts, this.deleteRangeRequest(key));
    }

    public Lock newLock() {
        log.debug("Create new lock with empty value, lease [{}].", (Object)this.lease());
        return new Lock("");
    }

    public Lock newLock(String value) {
        log.debug("Create new lock with [{}], lease [{}].", (Object)value, (Object)this.lease());
        return new Lock(value);
    }

    @Deprecated
    public Lock newLock(Consumer<Lock> onReset) {
        log.debug("Create new lock with empty value, lease [{}].", (Object)this.lease());
        return new Lock(onReset);
    }

    public void watchLock(Kv kv, Runnable task) {
        CompletableFuture.supplyAsync(() -> this.kvService.watch(this.watchRequest(kv.getKv().getKey(), kv.getModRevision()))).whenCompleteAsync((r, e) -> {
            if (e != null) {
                if (!(e instanceof DingoClientException)) {
                    this.watchLock(kv, task);
                    return;
                }
                log.error("Watch locked error, or watch retry time great than lease ttl.", e);
                return;
            }
            if (r.getEvents() == null) {
                this.watchLock(kv, task);
            } else if (r.getEvents().stream().map(Event::getType).anyMatch(type -> type == EventType.DELETE || type == EventType.NOT_EXISTS)) {
                task.run();
            } else {
                this.watchLock(kv, task);
            }
        }, (Executor)LOCK_FUTURE_POOL);
    }

    @Override
    public void watchAllOpEvent(Kv kv, Function<String, String> function) {
        CompletableFuture.supplyAsync(() -> this.kvService.watch(this.watchAllOpRequest(kv.getKv().getKey(), kv.getModRevision()))).whenCompleteAsync((r, e) -> {
            if (e != null) {
                log.error("Watch locked error, or watch retry time great than lease ttl.", e);
                if (!(e instanceof DingoClientException)) {
                    this.resetVerService();
                    this.watchAllOpEvent(kv, function);
                    return;
                }
                return;
            }
            String typeStr = "normal";
            if (r.getEvents() == null) {
                typeStr = "transferLeader";
            } else if (r.getEvents().stream().map(Event::getType).anyMatch(type -> type == EventType.DELETE || type == EventType.NOT_EXISTS)) {
                typeStr = "keyNone";
            }
            function.apply(typeStr);
            this.watchAllOpEvent(kv, function);
        }, (Executor)LOCK_FUTURE_POOL);
    }

    private PutRequest putRequest(String resourceKey, String value) {
        return ((PutRequest.PutRequestBuilder)((PutRequest.PutRequestBuilder)((PutRequest.PutRequestBuilder)((PutRequest.PutRequestBuilder)PutRequest.builder().lease(this.lease())).ignoreValue(value == null || value.isEmpty())).keyValue((KeyValue)((KeyValue.KeyValueBuilder)((KeyValue.KeyValueBuilder)KeyValue.builder().key(resourceKey.getBytes(StandardCharsets.UTF_8))).value(value == null ? null : value.getBytes(StandardCharsets.UTF_8))).build())).needPrevKv(true)).build();
    }

    private RangeRequest rangeRequest() {
        return ((RangeRequest.RangeRequestBuilder)((RangeRequest.RangeRequestBuilder)RangeRequest.builder().key(this.resourcePrefixBegin.getBytes(StandardCharsets.UTF_8))).rangeEnd(this.resourcePrefixEnd.getBytes(StandardCharsets.UTF_8))).build();
    }

    private DeleteRangeRequest deleteRangeRequest(String resourceKey) {
        return ((DeleteRangeRequest.DeleteRangeRequestBuilder)DeleteRangeRequest.builder().key(resourceKey.getBytes(StandardCharsets.UTF_8))).build();
    }

    private DeleteRangeRequest deleteAllRangeRequest() {
        return ((DeleteRangeRequest.DeleteRangeRequestBuilder)((DeleteRangeRequest.DeleteRangeRequestBuilder)DeleteRangeRequest.builder().key(this.resourcePrefixKeyBegin.getBytes(StandardCharsets.UTF_8))).rangeEnd(this.resourcePrefixKeyEnd.getBytes(StandardCharsets.UTF_8))).build();
    }

    private WatchRequest watchRequest(String resourceKey, long revision) {
        return this.watchRequest(resourceKey.getBytes(StandardCharsets.UTF_8), revision);
    }

    private WatchRequest watchRequest(byte[] resourceKey, long revision) {
        return ((WatchRequest.WatchRequestBuilder)WatchRequest.builder().requestUnion((WatchRequest.RequestUnionNest)((Object)((WatchRequest.RequestUnionNest.OneTimeRequest.OneTimeRequestBuilder)((WatchRequest.RequestUnionNest.OneTimeRequest.OneTimeRequestBuilder)((WatchRequest.RequestUnionNest.OneTimeRequest.OneTimeRequestBuilder)((WatchRequest.RequestUnionNest.OneTimeRequest.OneTimeRequestBuilder)WatchRequest.RequestUnionNest.OneTimeRequest.builder().key(resourceKey)).needPrevKv(true)).startRevision(revision)).filters(Collections.singletonList(EventFilterType.NOPUT))).build()))).build();
    }

    private WatchRequest watchAllOpRequest(byte[] resourceKey, long revision) {
        return ((WatchRequest.WatchRequestBuilder)WatchRequest.builder().requestUnion((WatchRequest.RequestUnionNest)((Object)((WatchRequest.RequestUnionNest.OneTimeRequest.OneTimeRequestBuilder)((WatchRequest.RequestUnionNest.OneTimeRequest.OneTimeRequestBuilder)((WatchRequest.RequestUnionNest.OneTimeRequest.OneTimeRequestBuilder)WatchRequest.RequestUnionNest.OneTimeRequest.builder().key(resourceKey)).needPrevKv(true)).startRevision(revision)).build()))).build();
    }

    public class Lock
    implements java.util.concurrent.locks.Lock {
        public final String lockId = UUID.randomUUID().toString();
        public final String resourceKey = LockService.this.getResourcePrefixKeyBegin() + this.lockId;
        public final String resourceValue;
        private final Consumer<Lock> onReset;
        private final CompletableFuture<Void> destroyFuture = new CompletableFuture();
        private int locked = 0;
        private long revision;

        @Deprecated
        public Lock(Consumer<Lock> onReset) {
            this.onReset = onReset;
            this.resourceValue = "";
        }

        public Lock(String value) {
            this.onReset = null;
            this.resourceValue = value;
        }

        private synchronized void destroy() {
            log.info("destroy start, locked:{}, destroy done:{}", (Object)this.locked, (Object)this.destroyFuture.isDone());
            if (this.locked == 0) {
                log.info("destroy locked return, resource:{}", (Object)LockService.this.resource);
                return;
            }
            if (!this.destroyFuture.isDone()) {
                log.info("destroy complete, resource:{}", (Object)LockService.this.resource);
                this.destroyFuture.complete(null);
            }
            CompletableFuture.runAsync(() -> LockService.this.kvService.kvDeleteRange(LockService.this.deleteRangeRequest(this.resourceKey))).whenComplete((r, e) -> {
                if (this.onReset != null) {
                    this.onReset.accept(this);
                }
                if (e != null) {
                    log.error("Delete {} error when reset.", (Object)this.resourceKey, e);
                    this.destroy();
                } else {
                    log.info("destroy kv delete range done, resource:{}", (Object)LockService.this.resource);
                }
            });
        }

        private boolean locked() {
            if (this.locked > 0) {
                if (this.destroyFuture.isDone()) {
                    return false;
                }
                ++this.locked;
                return true;
            }
            return false;
        }

        public Future watchDestroy() {
            return new Future(this.destroyFuture);
        }

        private boolean isLockRevision(long revision, RangeResponse rangeResponse) {
            if (rangeResponse.getKvs().isEmpty()) {
                throw new RuntimeException("Put " + this.resourceKey + " success, but range is empty.");
            }
            Kv current = rangeResponse.getKvs().stream().filter(Objects::nonNull).min(Comparator.comparingLong(Kv::getCreateRevision)).get();
            if (current.getCreateRevision() == revision) {
                this.revision = revision;
                if (log.isDebugEnabled()) {
                    log.debug("Lock {} success use {} revision, current locks: {}.", new Object[]{this.resourceKey, revision, rangeResponse.getKvs()});
                }
                if (!this.destroyFuture.isDone()) {
                    ++this.locked;
                    LockService.this.watchLock(current, this::destroy);
                }
                return true;
            }
            return false;
        }

        private long getCreateRevision(PutResponse response) {
            if (response.getPrevKv() == null) {
                return response.getHeader().getRevision();
            }
            return response.getPrevKv().getCreateRevision();
        }

        @Override
        public synchronized void lock() {
            if (this.locked()) {
                if (!LockService.this.ownerLockList.contains(this)) {
                    LockService.this.ownerLockList.add(this);
                }
                return;
            }
            while (true) {
                long start = System.currentTimeMillis();
                try {
                    PutResponse response = LockService.this.kvService.kvPut(LockService.this.putRequest(this.resourceKey, this.resourceValue));
                    long revision = this.getCreateRevision(response);
                    RangeResponse rangeResponse = LockService.this.kvService.kvRange(LockService.this.rangeRequest());
                    if (this.isLockRevision(revision, rangeResponse)) break;
                    Kv previous = rangeResponse.getKvs().stream().filter(Objects::nonNull).filter(__ -> __.getCreateRevision() < revision).max(Comparator.comparingLong(Kv::getCreateRevision)).orElseThrow(() -> new RuntimeException("Put " + this.resourceKey + " success, but no previous."));
                    if (log.isDebugEnabled()) {
                        log.debug("Lock {} wait...", (Object)this.resourceKey);
                    }
                    try {
                        LockService.this.kvService.watch(LockService.this.watchRequest(previous.getKv().getKey(), previous.getCreateRevision()));
                        if (!this.isLockRevision(revision, LockService.this.kvService.kvRange(LockService.this.rangeRequest()))) continue;
                        break;
                    }
                    catch (Exception exception) {
                    }
                }
                catch (Exception e) {
                    log.error("Lock {} error, id: {}, cost:{}", new Object[]{this.resourceKey, this.lockId, System.currentTimeMillis() - start, e});
                }
            }
            if (this.destroyFuture.isDone()) {
                throw new RuntimeException("Destroyed!");
            }
            LockService.this.ttlRefreshTime = System.currentTimeMillis();
            if (!LockService.this.ownerLockList.contains(this)) {
                LockService.this.ownerLockList.add(this);
            }
        }

        @Override
        public synchronized void lockInterruptibly() throws InterruptedException {
            throw new UnsupportedOperationException();
        }

        @Override
        public synchronized boolean tryLock() {
            if (this.locked()) {
                return true;
            }
            if (LockService.this.lease == -1L) {
                return false;
            }
            try {
                PutResponse response = LockService.this.kvService.kvPut(LockService.this.putRequest(this.resourceKey, this.resourceValue));
                long revision = this.getCreateRevision(response);
                Optional<Kv> current = LockService.this.kvService.kvRange(LockService.this.rangeRequest()).getKvs().stream().min(Comparator.comparingLong(Kv::getCreateRevision));
                if (current.map(Kv::getCreateRevision).filter(__ -> __ == revision).isPresent()) {
                    ++this.locked;
                    LockService.this.watchLock(current.get(), this::destroy);
                    return true;
                }
            }
            catch (Exception e) {
                log.error("Try lock error.", (Throwable)e);
            }
            LockService.this.kvService.kvDeleteRange(LockService.this.deleteRangeRequest(this.resourceKey));
            return false;
        }

        @Override
        public synchronized boolean tryLock(long time, @NonNull TimeUnit unit) throws InterruptedException {
            if (this.locked()) {
                return true;
            }
            try {
                PutResponse response = LockService.this.kvService.kvPut(LockService.this.putRequest(this.resourceKey, this.resourceValue));
                long revision = this.getCreateRevision(response);
                while (time-- > 0L) {
                    RangeResponse rangeResponse = LockService.this.kvService.kvRange(LockService.this.rangeRequest());
                    Kv current = rangeResponse.getKvs().stream().filter(Objects::nonNull).min(Comparator.comparingLong(Kv::getCreateRevision)).orElseThrow(() -> new RuntimeException("Put " + this.resourceKey + " success, but range is empty."));
                    if (current.getCreateRevision() == revision) {
                        if (log.isDebugEnabled()) {
                            log.debug("Lock {} wait...", (Object)this.resourceKey);
                        }
                        if (!this.destroyFuture.isDone()) {
                            ++this.locked;
                            LockService.this.watchLock(current, this::destroy);
                            return true;
                        }
                        throw new RuntimeException("Destroyed!");
                    }
                    LockSupport.parkNanos(unit.toNanos(1L));
                    if (!Thread.interrupted()) continue;
                    throw new InterruptedException();
                }
            }
            catch (InterruptedException interruptedException) {
                LockService.this.kvService.kvDeleteRange(LockService.this.deleteRangeRequest(this.resourceKey));
                throw interruptedException;
            }
            catch (Exception e) {
                log.error("Try lock error.", (Throwable)e);
            }
            LockService.this.kvService.kvDeleteRange(LockService.this.deleteRangeRequest(this.resourceKey));
            return false;
        }

        @Override
        public synchronized void unlock() {
            if (this.locked == 0) {
                return;
            }
            if (--this.locked == 0) {
                LockService.this.kvService.kvDeleteRange(LockService.this.deleteRangeRequest(this.resourceKey));
            }
        }

        @Override
        public synchronized Condition newCondition() {
            throw new UnsupportedOperationException();
        }

        public int getLocked() {
            return this.locked;
        }

        public long getRevision() {
            return this.revision;
        }
    }
}

