/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.cluster.fdetector;

import io.scalecube.Preconditions;
import io.scalecube.ThreadFactoryBuilder;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetector;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorEvent;
import io.scalecube.cluster.fdetector.PingData;
import io.scalecube.cluster.membership.MemberStatus;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocol;
import io.scalecube.transport.Message;
import io.scalecube.transport.Transport;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public final class FailureDetectorImpl
implements FailureDetector {
    private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorImpl.class);
    public static final String PING = "sc/fdetector/ping";
    public static final String PING_REQ = "sc/fdetector/pingReq";
    public static final String PING_ACK = "sc/fdetector/pingAck";
    private final Transport transport;
    private final MembershipProtocol membership;
    private final FailureDetectorConfig config;
    private long period = 0L;
    private List<Member> pingMembers = new ArrayList<Member>();
    private int pingMemberIndex = 0;
    private final Disposable.Composite actionDisposables = Disposables.composite();
    private final FluxProcessor<FailureDetectorEvent, FailureDetectorEvent> subject = DirectProcessor.create().serialize();
    private final FluxSink<FailureDetectorEvent> sink = this.subject.sink();
    private final ScheduledExecutorService executor;
    private final Scheduler scheduler;
    private ScheduledFuture<?> pingTask;

    public FailureDetectorImpl(Transport transport, MembershipProtocol membership, FailureDetectorConfig config) {
        Preconditions.checkArgument(transport != null);
        Preconditions.checkArgument(membership != null);
        Preconditions.checkArgument(config != null);
        this.transport = transport;
        this.membership = membership;
        this.config = config;
        String nameFormat = "sc-fdetector-" + Integer.toString(membership.member().address().port());
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(nameFormat).setDaemon(true).build());
        this.scheduler = Schedulers.fromExecutorService(this.executor);
    }

    Transport getTransport() {
        return this.transport;
    }

    @Override
    public void start() {
        this.actionDisposables.addAll(Arrays.asList(this.membership.listen().publishOn(this.scheduler).filter(MembershipEvent::isAdded).map(MembershipEvent::member).subscribe(this::onMemberAdded, this::onError), this.membership.listen().publishOn(this.scheduler).filter(MembershipEvent::isRemoved).map(MembershipEvent::member).subscribe(this::onMemberRemoved, this::onError), this.membership.listen().publishOn(this.scheduler).filter(MembershipEvent::isUpdated).subscribe(this::onMemberUpdated, this::onError), this.transport.listen().publishOn(this.scheduler).filter(this::isPing).subscribe(this::onPing, this::onError), this.transport.listen().publishOn(this.scheduler).filter(this::isPingReq).subscribe(this::onPingReq, this::onError), this.transport.listen().publishOn(this.scheduler).filter(this::isTransitPingAck).subscribe(this::onTransitPingAck, this::onError)));
        this.pingTask = this.executor.scheduleWithFixedDelay(this::doPing, this.config.getPingInterval(), this.config.getPingInterval(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void stop() {
        this.actionDisposables.dispose();
        if (this.pingTask != null) {
            this.pingTask.cancel(true);
        }
        this.executor.shutdown();
        this.sink.complete();
    }

    @Override
    public Flux<FailureDetectorEvent> listen() {
        return this.subject.onBackpressureDrop();
    }

    private void doPing() {
        ++this.period;
        Member pingMember = this.selectPingMember();
        if (pingMember == null) {
            return;
        }
        Member localMember = this.membership.member();
        String cid = localMember.id() + "-" + Long.toString(this.period);
        PingData pingData = new PingData(localMember, pingMember);
        Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build();
        try {
            LOGGER.trace("Send Ping[{}] to {}", (Object)this.period, (Object)pingMember);
            this.transport.listen().publishOn(this.scheduler).filter(this::isPingAck).filter(message -> cid.equals(message.correlationId())).take(1L).timeout(Duration.ofMillis(this.config.getPingTimeout()), this.scheduler).subscribe(message -> {
                LOGGER.trace("Received PingAck[{}] from {}", (Object)this.period, (Object)pingMember);
                this.publishPingResult(pingMember, MemberStatus.ALIVE);
            }, throwable -> {
                LOGGER.trace("Timeout getting PingAck[{}] from {} within {} ms", this.period, pingMember, this.config.getPingTimeout());
                this.doPingReq(pingMember, cid);
            });
            this.transport.send(pingMember.address(), pingMsg);
        }
        catch (Exception cause) {
            LOGGER.error("Exception on sending Ping[{}] to {}: {}", this.period, pingMember, cause.getMessage(), cause);
        }
    }

    private void doPingReq(Member pingMember, String cid) {
        int timeout = this.config.getPingInterval() - this.config.getPingTimeout();
        if (timeout <= 0) {
            LOGGER.trace("No PingReq[{}] occurred, because no time left (pingInterval={}, pingTimeout={})", this.period, this.config.getPingInterval(), this.config.getPingTimeout());
            this.publishPingResult(pingMember, MemberStatus.SUSPECT);
            return;
        }
        List<Member> pingReqMembers = this.selectPingReqMembers(pingMember);
        if (pingReqMembers.isEmpty()) {
            LOGGER.trace("No PingReq[{}] occurred, because member selection is empty", (Object)this.period);
            this.publishPingResult(pingMember, MemberStatus.SUSPECT);
            return;
        }
        Member localMember = this.membership.member();
        this.transport.listen().publishOn(this.scheduler).filter(this::isPingAck).filter(message -> cid.equals(message.correlationId())).take(1L).timeout(Duration.ofMillis(timeout), this.scheduler).subscribe(message -> {
            LOGGER.trace("Received transit PingAck[{}] from {} to {}", this.period, message.sender(), pingMember);
            this.publishPingResult(pingMember, MemberStatus.ALIVE);
        }, throwable -> {
            LOGGER.trace("Timeout getting transit PingAck[{}] from {} to {} within {} ms", this.period, pingReqMembers, pingMember, timeout);
            this.publishPingResult(pingMember, MemberStatus.SUSPECT);
        });
        PingData pingReqData = new PingData(localMember, pingMember);
        Message pingReqMsg = Message.withData(pingReqData).qualifier(PING_REQ).correlationId(cid).build();
        LOGGER.trace("Send PingReq[{}] to {} for {}", this.period, pingReqMembers, pingMember);
        for (Member pingReqMember : pingReqMembers) {
            this.transport.send(pingReqMember.address(), pingReqMsg);
        }
    }

    private void onMemberAdded(Member member) {
        int size = this.pingMembers.size();
        int index = size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0;
        this.pingMembers.add(index, member);
    }

    private void onMemberRemoved(Member member) {
        this.pingMembers.remove(member);
    }

    private void onMemberUpdated(MembershipEvent membershipEvent) {
        int index = this.pingMembers.indexOf(membershipEvent.oldMember());
        if (index != -1) {
            this.pingMembers.set(index, membershipEvent.newMember());
        }
    }

    private void onPing(Message message) {
        LOGGER.trace("Received Ping: {}", (Object)message);
        PingData data = (PingData)message.data();
        Member localMember = this.membership.member();
        if (!data.getTo().id().equals(localMember.id())) {
            LOGGER.warn("Received Ping to {}, but local member is {}", (Object)data.getTo(), (Object)localMember);
            return;
        }
        String correlationId = message.correlationId();
        Message ackMessage = Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build();
        LOGGER.trace("Send PingAck to {}", (Object)data.getFrom().address());
        this.transport.send(data.getFrom().address(), ackMessage);
    }

    private void onPingReq(Message message) {
        LOGGER.trace("Received PingReq: {}", (Object)message);
        PingData data = (PingData)message.data();
        Member target = data.getTo();
        Member originalIssuer = data.getFrom();
        String correlationId = message.correlationId();
        PingData pingReqData = new PingData(this.membership.member(), target, originalIssuer);
        Message pingMessage = Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build();
        LOGGER.trace("Send transit Ping to {}", (Object)target.address());
        this.transport.send(target.address(), pingMessage);
    }

    private void onTransitPingAck(Message message) {
        LOGGER.trace("Received transit PingAck: {}", (Object)message);
        PingData data = (PingData)message.data();
        Member target = data.getOriginalIssuer();
        String correlationId = message.correlationId();
        PingData originalAckData = new PingData(target, data.getTo());
        Message originalAckMessage = Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build();
        LOGGER.trace("Resend transit PingAck to {}", (Object)target.address());
        this.transport.send(target.address(), originalAckMessage);
    }

    private void onError(Throwable throwable) {
        LOGGER.error("Received unexpected error: ", throwable);
    }

    private Member selectPingMember() {
        if (this.pingMembers.isEmpty()) {
            return null;
        }
        if (this.pingMemberIndex >= this.pingMembers.size()) {
            this.pingMemberIndex = 0;
            Collections.shuffle(this.pingMembers);
        }
        return this.pingMembers.get(this.pingMemberIndex++);
    }

    private List<Member> selectPingReqMembers(Member pingMember) {
        if (this.config.getPingReqMembers() <= 0) {
            return Collections.emptyList();
        }
        ArrayList<Member> candidates = new ArrayList<Member>(this.pingMembers);
        candidates.remove(pingMember);
        if (candidates.isEmpty()) {
            return Collections.emptyList();
        }
        Collections.shuffle(candidates);
        boolean selectAll = candidates.size() < this.config.getPingReqMembers();
        return selectAll ? candidates : candidates.subList(0, this.config.getPingReqMembers());
    }

    private void publishPingResult(Member member, MemberStatus status) {
        LOGGER.debug("Member {} detected as {}", (Object)member, (Object)status);
        this.sink.next(new FailureDetectorEvent(member, status));
    }

    private boolean isPing(Message message) {
        return PING.equals(message.qualifier());
    }

    private boolean isPingReq(Message message) {
        return PING_REQ.equals(message.qualifier());
    }

    private boolean isPingAck(Message message) {
        return PING_ACK.equals(message.qualifier()) && ((PingData)message.data()).getOriginalIssuer() == null;
    }

    private boolean isTransitPingAck(Message message) {
        return PING_ACK.equals(message.qualifier()) && ((PingData)message.data()).getOriginalIssuer() != null;
    }
}

