/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.cluster.membership;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetector;
import io.scalecube.cluster.fdetector.FailureDetectorEvent;
import io.scalecube.cluster.gossip.GossipProtocol;
import io.scalecube.cluster.membership.IdGenerator;
import io.scalecube.cluster.membership.MemberStatus;
import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocol;
import io.scalecube.cluster.membership.MembershipRecord;
import io.scalecube.cluster.membership.SyncData;
import io.scalecube.transport.Address;
import io.scalecube.transport.Message;
import io.scalecube.transport.Transport;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public final class MembershipProtocolImpl
implements MembershipProtocol {
    private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocolImpl.class);
    public static final String SYNC = "sc/membership/sync";
    public static final String SYNC_ACK = "sc/membership/syncAck";
    public static final String MEMBERSHIP_GOSSIP = "sc/membership/gossip";
    private final AtomicReference<Member> memberRef;
    private final Transport transport;
    private final MembershipConfig config;
    private final List<Address> seedMembers;
    private FailureDetector failureDetector;
    private GossipProtocol gossipProtocol;
    private final Map<String, MembershipRecord> membershipTable = new HashMap<String, MembershipRecord>();
    private final Subject<MembershipEvent, MembershipEvent> subject = PublishSubject.create().toSerialized();
    private Subscriber<Message> onSyncRequestSubscriber;
    private Subscriber<Message> onSyncAckResponseSubscriber;
    private Subscriber<FailureDetectorEvent> onFdEventSubscriber;
    private Subscriber<Message> onGossipRequestSubscriber;
    private final Scheduler scheduler;
    private final ScheduledExecutorService executor;
    private final Map<String, ScheduledFuture<?>> suspicionTimeoutTasks = new HashMap();
    private ScheduledFuture<?> syncTask;

    public MembershipProtocolImpl(Transport transport, MembershipConfig config) {
        this.transport = transport;
        this.config = config;
        Address address = MembershipProtocolImpl.memberAddress(transport, config);
        Member member = new Member(IdGenerator.generateId(), address, config.getMetadata());
        this.memberRef = new AtomicReference<Member>(member);
        String nameFormat = "sc-membership-" + Integer.toString(address.port());
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(nameFormat).setDaemon(true).build());
        this.scheduler = Schedulers.from(this.executor);
        this.seedMembers = this.cleanUpSeedMembers(config.getSeedMembers());
    }

    protected static Address memberAddress(Transport transport, MembershipConfig config) {
        Address memberAddress = transport.address();
        if (config.getMemberHost() != null) {
            int memberPort = config.getMemberPort() != null ? config.getMemberPort().intValue() : memberAddress.port();
            memberAddress = Address.create(config.getMemberHost(), memberPort);
        }
        return memberAddress;
    }

    private List<Address> cleanUpSeedMembers(Collection<Address> seedMembers) {
        HashSet<Address> seedMembersSet = new HashSet<Address>(seedMembers);
        seedMembersSet.remove(this.member().address());
        return Collections.unmodifiableList(new ArrayList<Address>(seedMembersSet));
    }

    public void setFailureDetector(FailureDetector failureDetector) {
        this.failureDetector = failureDetector;
    }

    public void setGossipProtocol(GossipProtocol gossipProtocol) {
        this.gossipProtocol = gossipProtocol;
    }

    FailureDetector getFailureDetector() {
        return this.failureDetector;
    }

    GossipProtocol getGossipProtocol() {
        return this.gossipProtocol;
    }

    Transport getTransport() {
        return this.transport;
    }

    List<MembershipRecord> getMembershipRecords() {
        return ImmutableList.copyOf(this.membershipTable.values());
    }

    @Override
    public Observable<MembershipEvent> listen() {
        return this.subject.onBackpressureBuffer().asObservable();
    }

    @Override
    public Member member() {
        return this.memberRef.get();
    }

    @Override
    public void updateMetadata(Map<String, String> metadata) {
        this.executor.execute(() -> this.onUpdateMetadata(metadata));
    }

    @Override
    public void updateMetadataProperty(String key, String value) {
        this.executor.execute(() -> this.onUpdateMetadataProperty(key, value));
    }

    public CompletableFuture<String> leave() {
        CompletableFuture<String> future = new CompletableFuture<String>();
        this.executor.execute(() -> {
            CompletableFuture<String> leaveFuture = this.onLeave();
            leaveFuture.whenComplete((gossipId, error) -> future.complete((String)gossipId));
        });
        return future;
    }

    public CompletableFuture<Void> start() {
        Member member = this.memberRef.get();
        MembershipRecord localMemberRecord = new MembershipRecord(member, MemberStatus.ALIVE, 0);
        this.membershipTable.put(member.id(), localMemberRecord);
        this.onSyncRequestSubscriber = Subscribers.create(this::onSync, this::onError);
        this.transport.listen().observeOn(this.scheduler).filter(msg -> SYNC.equals(msg.qualifier())).filter(this::checkSyncGroup).subscribe(this.onSyncRequestSubscriber);
        this.onSyncAckResponseSubscriber = Subscribers.create(this::onSyncAck, this::onError);
        this.transport.listen().observeOn(this.scheduler).filter(msg -> SYNC_ACK.equals(msg.qualifier())).filter(msg -> msg.correlationId() == null).filter(this::checkSyncGroup).subscribe(this.onSyncAckResponseSubscriber);
        this.onFdEventSubscriber = Subscribers.create(this::onFailureDetectorEvent, this::onError);
        this.failureDetector.listen().observeOn(this.scheduler).subscribe(this.onFdEventSubscriber);
        this.onGossipRequestSubscriber = Subscribers.create(this::onMembershipGossip, this::onError);
        this.gossipProtocol.listen().observeOn(this.scheduler).filter(msg -> MEMBERSHIP_GOSSIP.equals(msg.qualifier())).subscribe(this.onGossipRequestSubscriber);
        return this.doInitialSync();
    }

    private void onError(Throwable throwable) {
        LOGGER.error("Received unexpected error: ", throwable);
    }

    public void stop() {
        if (this.onSyncRequestSubscriber != null) {
            this.onSyncRequestSubscriber.unsubscribe();
        }
        if (this.onFdEventSubscriber != null) {
            this.onFdEventSubscriber.unsubscribe();
        }
        if (this.onGossipRequestSubscriber != null) {
            this.onGossipRequestSubscriber.unsubscribe();
        }
        if (this.onSyncAckResponseSubscriber != null) {
            this.onSyncAckResponseSubscriber.unsubscribe();
        }
        if (this.syncTask != null) {
            this.syncTask.cancel(true);
        }
        for (String memberId : this.suspicionTimeoutTasks.keySet()) {
            ScheduledFuture<?> future = this.suspicionTimeoutTasks.get(memberId);
            if (future == null) continue;
            future.cancel(true);
        }
        this.suspicionTimeoutTasks.clear();
        this.executor.shutdown();
        this.subject.onCompleted();
    }

    private CompletableFuture<Void> doInitialSync() {
        LOGGER.debug("Making initial Sync to all seed members: {}", (Object)this.seedMembers);
        if (this.seedMembers.isEmpty()) {
            this.schedulePeriodicSync();
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> syncResponseFuture = new CompletableFuture<Void>();
        String cid = this.memberRef.get().id();
        this.transport.listen().observeOn(this.scheduler).filter(msg -> SYNC_ACK.equals(msg.qualifier())).filter(msg -> cid.equals(msg.correlationId())).filter(this::checkSyncGroup).take(1).timeout((long)this.config.getSyncTimeout(), TimeUnit.MILLISECONDS, this.scheduler).subscribe(message -> {
            SyncData syncData = (SyncData)message.data();
            LOGGER.info("Joined cluster '{}': {}", (Object)syncData.getSyncGroup(), (Object)syncData.getMembership());
            this.onSyncAck((Message)message, true);
            this.schedulePeriodicSync();
            syncResponseFuture.complete(null);
        }, throwable -> {
            LOGGER.info("Timeout getting initial SyncAck from seed members: {}", (Object)this.seedMembers);
            this.schedulePeriodicSync();
            syncResponseFuture.complete(null);
        });
        Message syncMsg = this.prepareSyncDataMsg(SYNC, cid);
        this.seedMembers.forEach(address -> this.transport.send((Address)address, syncMsg));
        return syncResponseFuture;
    }

    private void doSync() {
        try {
            Address syncMember = this.selectSyncAddress();
            if (syncMember == null) {
                return;
            }
            Message syncMsg = this.prepareSyncDataMsg(SYNC, null);
            this.transport.send(syncMember, syncMsg);
            LOGGER.debug("Send Sync to {}: {}", (Object)syncMember, (Object)syncMsg);
        }
        catch (Exception cause) {
            LOGGER.error("Unhandled exception: {}", (Object)cause, (Object)cause);
        }
    }

    private void onUpdateMetadataProperty(String key, String value) {
        Member curMember = this.memberRef.get();
        HashMap<String, String> metadata = new HashMap<String, String>(curMember.metadata());
        metadata.put(key, value);
        this.onUpdateMetadata(metadata);
    }

    private void onUpdateMetadata(Map<String, String> metadata) {
        Member curMember = this.memberRef.get();
        String memberId = curMember.id();
        Member newMember = new Member(memberId, curMember.address(), metadata);
        this.memberRef.set(newMember);
        MembershipRecord curRecord = this.membershipTable.get(memberId);
        MembershipRecord newRecord = new MembershipRecord(newMember, MemberStatus.ALIVE, curRecord.incarnation() + 1);
        this.membershipTable.put(memberId, newRecord);
        this.subject.onNext(MembershipEvent.createUpdated(curMember, newMember));
        this.spreadMembershipGossip(newRecord);
    }

    private void onSyncAck(Message syncAckMsg) {
        this.onSyncAck(syncAckMsg, false);
    }

    private void onSyncAck(Message syncAckMsg, boolean initial) {
        LOGGER.debug("Received SyncAck: {}", (Object)syncAckMsg);
        this.syncMembership((SyncData)syncAckMsg.data(), initial);
    }

    private void onSync(Message syncMsg) {
        LOGGER.debug("Received Sync: {}", (Object)syncMsg);
        this.syncMembership((SyncData)syncMsg.data(), false);
        Message syncAckMsg = this.prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());
        this.transport.send(syncMsg.sender(), syncAckMsg);
    }

    private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {
        MembershipRecord r0 = this.membershipTable.get(fdEvent.member().id());
        if (r0 == null) {
            return;
        }
        if (r0.status() == fdEvent.status()) {
            return;
        }
        LOGGER.debug("Received status change on failure detector event: {}", (Object)fdEvent);
        if (fdEvent.status() == MemberStatus.ALIVE) {
            Message syncMsg = this.prepareSyncDataMsg(SYNC, null);
            this.transport.send(fdEvent.member().address(), syncMsg);
        } else {
            MembershipRecord r1 = new MembershipRecord(r0.member(), fdEvent.status(), r0.incarnation());
            this.updateMembership(r1, MembershipUpdateReason.FAILURE_DETECTOR_EVENT);
        }
    }

    private void onMembershipGossip(Message message) {
        MembershipRecord record = (MembershipRecord)message.data();
        LOGGER.debug("Received membership gossip: {}", (Object)record);
        this.updateMembership(record, MembershipUpdateReason.MEMBERSHIP_GOSSIP);
    }

    private Address selectSyncAddress() {
        return !this.seedMembers.isEmpty() ? this.seedMembers.get(ThreadLocalRandom.current().nextInt(this.seedMembers.size())) : null;
    }

    private boolean checkSyncGroup(Message message) {
        SyncData data = (SyncData)message.data();
        return this.config.getSyncGroup().equals(data.getSyncGroup());
    }

    private void schedulePeriodicSync() {
        int syncInterval = this.config.getSyncInterval();
        this.syncTask = this.executor.scheduleWithFixedDelay(this::doSync, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
    }

    private Message prepareSyncDataMsg(String qualifier, String cid) {
        ArrayList<MembershipRecord> membershipRecords = new ArrayList<MembershipRecord>(this.membershipTable.values());
        SyncData syncData = new SyncData(membershipRecords, this.config.getSyncGroup());
        return Message.withData(syncData).qualifier(qualifier).correlationId(cid).build();
    }

    private void syncMembership(SyncData syncData, boolean initial) {
        for (MembershipRecord r1 : syncData.getMembership()) {
            MembershipRecord r0;
            if (r1.equals(r0 = this.membershipTable.get(r1.id()))) continue;
            MembershipUpdateReason reason = initial ? MembershipUpdateReason.INITIAL_SYNC : MembershipUpdateReason.SYNC;
            this.updateMembership(r1, reason);
        }
    }

    private void updateMembership(MembershipRecord r1, MembershipUpdateReason reason) {
        Preconditions.checkArgument(r1 != null, "Membership record can't be null");
        MembershipRecord r0 = this.membershipTable.get(r1.id());
        if (!r1.isOverrides(r0)) {
            return;
        }
        Member localMember = this.memberRef.get();
        if (r1.member().id().equals(localMember.id())) {
            int currentIncarnation = Math.max(r0.incarnation(), r1.incarnation());
            MembershipRecord r2 = new MembershipRecord(localMember, r0.status(), currentIncarnation + 1);
            this.membershipTable.put(localMember.id(), r2);
            LOGGER.debug("Local membership record r0={}, but received r1={}, spread r2={}", r0, r1, r2);
            this.spreadMembershipGossip(r2);
            return;
        }
        if (r1.isDead()) {
            this.membershipTable.remove(r1.id());
        } else {
            this.membershipTable.put(r1.id(), r1);
        }
        if (r1.isSuspect()) {
            this.scheduleSuspicionTimeoutTask(r1);
        } else {
            this.cancelSuspicionTimeoutTask(r1.id());
        }
        if (r1.isDead()) {
            this.subject.onNext(MembershipEvent.createRemoved(r1.member()));
        } else if (r0 == null && r1.isAlive()) {
            this.subject.onNext(MembershipEvent.createAdded(r1.member()));
        } else if (r0 != null && !r0.member().equals(r1.member())) {
            this.subject.onNext(MembershipEvent.createUpdated(r0.member(), r1.member()));
        }
        if (reason != MembershipUpdateReason.MEMBERSHIP_GOSSIP && reason != MembershipUpdateReason.INITIAL_SYNC) {
            this.spreadMembershipGossip(r1);
        }
    }

    private void cancelSuspicionTimeoutTask(String memberId) {
        ScheduledFuture<?> future = this.suspicionTimeoutTasks.remove(memberId);
        if (future != null) {
            future.cancel(true);
        }
    }

    private void scheduleSuspicionTimeoutTask(MembershipRecord record) {
        long suspicionTimeout = ClusterMath.suspicionTimeout(this.config.getSuspicionMult(), this.membershipTable.size(), this.config.getPingInterval());
        this.suspicionTimeoutTasks.computeIfAbsent(record.id(), id -> this.executor.schedule(() -> this.onSuspicionTimeout((String)id), suspicionTimeout, TimeUnit.MILLISECONDS));
    }

    private void onSuspicionTimeout(String memberId) {
        this.suspicionTimeoutTasks.remove(memberId);
        MembershipRecord record = this.membershipTable.get(memberId);
        if (record != null) {
            LOGGER.debug("Declare SUSPECTED member as DEAD by timeout: {}", (Object)record);
            MembershipRecord deadRecord = new MembershipRecord(record.member(), MemberStatus.DEAD, record.incarnation());
            this.updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT);
        }
    }

    private CompletableFuture<String> onLeave() {
        Member curMember = this.memberRef.get();
        String memberId = curMember.id();
        MembershipRecord curRecord = this.membershipTable.get(memberId);
        MembershipRecord newRecord = new MembershipRecord(this.member(), MemberStatus.DEAD, curRecord.incarnation() + 1);
        this.membershipTable.put(memberId, newRecord);
        return this.spreadMembershipGossip(newRecord);
    }

    private CompletableFuture<String> spreadMembershipGossip(MembershipRecord record) {
        Message membershipMsg = Message.withData(record).qualifier(MEMBERSHIP_GOSSIP).build();
        return this.gossipProtocol.spread(membershipMsg);
    }

    private static enum MembershipUpdateReason {
        FAILURE_DETECTOR_EVENT,
        MEMBERSHIP_GOSSIP,
        SYNC,
        INITIAL_SYNC,
        SUSPICION_TIMEOUT;

    }
}

