/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.cluster.gossip;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.gossip.Gossip;
import io.scalecube.cluster.gossip.GossipConfig;
import io.scalecube.cluster.gossip.GossipProtocol;
import io.scalecube.cluster.gossip.GossipRequest;
import io.scalecube.cluster.gossip.GossipState;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocol;
import io.scalecube.transport.Message;
import io.scalecube.transport.Transport;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public final class GossipProtocolImpl
implements GossipProtocol {
    private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class);
    public static final String GOSSIP_REQ = "sc/gossip/req";
    private final Transport transport;
    private final MembershipProtocol membership;
    private final GossipConfig config;
    private long period = 0L;
    private long gossipCounter = 0L;
    private Map<String, GossipState> gossips = Maps.newHashMap();
    private Map<String, CompletableFuture<String>> futures = Maps.newHashMap();
    private List<Member> remoteMembers = new ArrayList<Member>();
    private int remoteMembersIndex = -1;
    private Subscriber<Member> onMemberAddedEventSubscriber;
    private Subscriber<Member> onMemberRemovedEventSubscriber;
    private Subscriber<Message> onGossipRequestSubscriber;
    private Subject<Message, Message> subject = PublishSubject.create().toSerialized();
    private final ScheduledExecutorService executor;
    private final Scheduler scheduler;
    private ScheduledFuture<?> spreadGossipTask;

    public GossipProtocolImpl(Transport transport, MembershipProtocol membership, GossipConfig config) {
        Preconditions.checkArgument(transport != null);
        Preconditions.checkArgument(membership != null);
        Preconditions.checkArgument(config != null);
        this.transport = transport;
        this.membership = membership;
        this.config = config;
        String nameFormat = "sc-gossip-" + Integer.toString(membership.member().address().port());
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(nameFormat).setDaemon(true).build());
        this.scheduler = Schedulers.from(this.executor);
    }

    Transport getTransport() {
        return this.transport;
    }

    Member getMember() {
        return this.membership.member();
    }

    @Override
    public void start() {
        this.onMemberAddedEventSubscriber = Subscribers.create(this.remoteMembers::add, this::onError);
        this.membership.listen().observeOn(this.scheduler).filter(MembershipEvent::isAdded).map(MembershipEvent::member).subscribe(this.onMemberAddedEventSubscriber);
        this.onMemberRemovedEventSubscriber = Subscribers.create(this.remoteMembers::remove, this::onError);
        this.membership.listen().observeOn(this.scheduler).filter(MembershipEvent::isRemoved).map(MembershipEvent::member).subscribe(this.onMemberRemovedEventSubscriber);
        this.onGossipRequestSubscriber = Subscribers.create(this::onGossipReq, this::onError);
        this.transport.listen().observeOn(this.scheduler).filter(this::isGossipReq).subscribe(this.onGossipRequestSubscriber);
        this.spreadGossipTask = this.executor.scheduleWithFixedDelay(this::doSpreadGossip, this.config.getGossipInterval(), this.config.getGossipInterval(), TimeUnit.MILLISECONDS);
    }

    private void onError(Throwable throwable) {
        LOGGER.error("Received unexpected error: ", throwable);
    }

    @Override
    public void stop() {
        if (this.onMemberAddedEventSubscriber != null) {
            this.onMemberAddedEventSubscriber.unsubscribe();
        }
        if (this.onMemberRemovedEventSubscriber != null) {
            this.onMemberRemovedEventSubscriber.unsubscribe();
        }
        if (this.onGossipRequestSubscriber != null) {
            this.onGossipRequestSubscriber.unsubscribe();
        }
        if (this.spreadGossipTask != null) {
            this.spreadGossipTask.cancel(true);
        }
        this.executor.shutdown();
        this.subject.onCompleted();
    }

    @Override
    public CompletableFuture<String> spread(Message message) {
        CompletableFuture<String> future = new CompletableFuture<String>();
        this.executor.execute(() -> this.futures.put(this.onSpreadGossip(message), future));
        return future;
    }

    @Override
    public Observable<Message> listen() {
        return this.subject.onBackpressureBuffer().asObservable();
    }

    private void doSpreadGossip() {
        ++this.period;
        if (this.gossips.isEmpty()) {
            return;
        }
        try {
            this.selectGossipMembers().forEach(this::spreadGossipsTo);
            this.sweepGossips();
        }
        catch (Exception cause) {
            LOGGER.error("Exception on sending GossipReq[{}] exception: {}", this.period, cause.getMessage(), cause);
        }
    }

    private String onSpreadGossip(Message message) {
        Gossip gossip = new Gossip(this.generateGossipId(), message);
        GossipState gossipState = new GossipState(gossip, this.period);
        this.gossips.put(gossip.gossipId(), gossipState);
        return gossip.gossipId();
    }

    private void onGossipReq(Message message) {
        GossipRequest gossipRequest = (GossipRequest)message.data();
        for (Gossip gossip : gossipRequest.gossips()) {
            GossipState gossipState = this.gossips.get(gossip.gossipId());
            if (gossipState == null) {
                gossipState = new GossipState(gossip, this.period);
                this.gossips.put(gossip.gossipId(), gossipState);
                this.subject.onNext(gossip.message());
            }
            gossipState.addToInfected(gossipRequest.from());
        }
    }

    private boolean isGossipReq(Message message) {
        return GOSSIP_REQ.equals(message.qualifier());
    }

    private String generateGossipId() {
        return this.membership.member().id() + "-" + this.gossipCounter++;
    }

    private void spreadGossipsTo(Member member) {
        List<Gossip> gossipsToSend = this.selectGossipsToSend(member);
        if (gossipsToSend.isEmpty()) {
            return;
        }
        Message gossipReqMsg = this.buildGossipRequestMessage(gossipsToSend);
        this.transport.send(member.address(), gossipReqMsg);
    }

    private List<Gossip> selectGossipsToSend(Member member) {
        int periodsToSpread = ClusterMath.gossipPeriodsToSpread(this.config.getGossipRepeatMult(), this.remoteMembers.size() + 1);
        return this.gossips.values().stream().filter(gossipState -> gossipState.infectionPeriod() + (long)periodsToSpread >= this.period).filter(gossipState -> !gossipState.isInfected(member.id())).map(GossipState::gossip).collect(Collectors.toList());
    }

    private List<Member> selectGossipMembers() {
        int gossipFanout = this.config.getGossipFanout();
        if (this.remoteMembers.size() < gossipFanout) {
            return this.remoteMembers;
        }
        if (this.remoteMembersIndex < 0 || this.remoteMembersIndex + gossipFanout > this.remoteMembers.size()) {
            Collections.shuffle(this.remoteMembers);
            this.remoteMembersIndex = 0;
        }
        List<Member> selectedMembers = gossipFanout == 1 ? Collections.singletonList(this.remoteMembers.get(this.remoteMembersIndex)) : this.remoteMembers.subList(this.remoteMembersIndex, this.remoteMembersIndex + gossipFanout);
        this.remoteMembersIndex += gossipFanout;
        return selectedMembers;
    }

    private Message buildGossipRequestMessage(List<Gossip> gossipsToSend) {
        GossipRequest gossipReqData = new GossipRequest(gossipsToSend, this.membership.member().id());
        return Message.withData(gossipReqData).qualifier(GOSSIP_REQ).build();
    }

    private void sweepGossips() {
        int periodsToSweep = ClusterMath.gossipPeriodsToSweep(this.config.getGossipRepeatMult(), this.remoteMembers.size() + 1);
        Set gossipsToRemove = this.gossips.values().stream().filter(gossipState -> this.period > gossipState.infectionPeriod() + (long)periodsToSweep).collect(Collectors.toSet());
        if (gossipsToRemove.isEmpty()) {
            return;
        }
        LOGGER.debug("Sweep gossips: {}", (Object)gossipsToRemove);
        for (GossipState gossipState2 : gossipsToRemove) {
            this.gossips.remove(gossipState2.gossip().gossipId());
            CompletableFuture<String> future = this.futures.remove(gossipState2.gossip().gossipId());
            if (future == null) continue;
            future.complete(gossipState2.gossip().gossipId());
        }
    }
}

