/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.cluster;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetectorImpl;
import io.scalecube.cluster.gossip.GossipProtocolImpl;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocolImpl;
import io.scalecube.transport.Address;
import io.scalecube.transport.Message;
import io.scalecube.transport.NetworkEmulator;
import io.scalecube.transport.Transport;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

final class ClusterImpl
implements Cluster {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterImpl.class);
    private static final Set<String> SYSTEM_MESSAGES = ImmutableSet.of("sc/fdetector/ping", "sc/fdetector/pingReq", "sc/fdetector/pingAck", "sc/membership/sync", "sc/membership/syncAck", "sc/gossip/req", new String[0]);
    private static final Set<String> SYSTEM_GOSSIPS = ImmutableSet.of("sc/membership/gossip");
    private final ClusterConfig config;
    private final ConcurrentMap<String, Member> members = new ConcurrentHashMap<String, Member>();
    private final ConcurrentMap<Address, String> memberAddressIndex = new ConcurrentHashMap<Address, String>();
    private Transport transport;
    private FailureDetectorImpl failureDetector;
    private GossipProtocolImpl gossip;
    private MembershipProtocolImpl membership;
    private Observable<Message> messageObservable;
    private Observable<Message> gossipObservable;

    public ClusterImpl(ClusterConfig config) {
        Preconditions.checkNotNull(config);
        this.config = config;
    }

    public CompletableFuture<Cluster> join0() {
        CompletableFuture<Transport> transportFuture = Transport.bind(this.config.getTransportConfig());
        CompletionStage clusterFuture = transportFuture.thenCompose(boundTransport -> {
            this.transport = boundTransport;
            this.messageObservable = this.transport.listen().filter(msg -> !SYSTEM_MESSAGES.contains(msg.qualifier()));
            this.membership = new MembershipProtocolImpl(this.transport, this.config);
            this.gossip = new GossipProtocolImpl(this.transport, this.membership, this.config);
            this.failureDetector = new FailureDetectorImpl(this.transport, this.membership, this.config);
            this.membership.setFailureDetector(this.failureDetector);
            this.membership.setGossipProtocol(this.gossip);
            Member localMember = this.membership.member();
            this.onMemberAdded(localMember);
            this.membership.listen().filter(MembershipEvent::isAdded).map(MembershipEvent::member).subscribe(this::onMemberAdded, this::onError);
            this.membership.listen().filter(MembershipEvent::isRemoved).map(MembershipEvent::member).subscribe(this::onMemberRemoved, this::onError);
            this.membership.listen().filter(MembershipEvent::isUpdated).map(MembershipEvent::member).subscribe(this::onMemberUpdated, this::onError);
            this.failureDetector.start();
            this.gossip.start();
            this.gossipObservable = this.gossip.listen().filter(msg -> !SYSTEM_GOSSIPS.contains(msg.qualifier()));
            return this.membership.start();
        });
        return ((CompletableFuture)clusterFuture).thenApply(aVoid -> this);
    }

    private void onError(Throwable throwable) {
        LOGGER.error("Received unexpected error: ", throwable);
    }

    private void onMemberAdded(Member member) {
        this.memberAddressIndex.put(member.address(), member.id());
        this.members.put(member.id(), member);
    }

    private void onMemberRemoved(Member member) {
        this.members.remove(member.id());
        this.memberAddressIndex.remove(member.address());
    }

    private void onMemberUpdated(Member member) {
        this.members.put(member.id(), member);
    }

    @Override
    public Address address() {
        return this.member().address();
    }

    @Override
    public void send(Member member, Message message) {
        this.transport.send(member.address(), message);
    }

    @Override
    public void send(Address address, Message message) {
        this.transport.send(address, message);
    }

    @Override
    public void send(Member member, Message message, CompletableFuture<Void> promise) {
        this.transport.send(member.address(), message, promise);
    }

    @Override
    public void send(Address address, Message message, CompletableFuture<Void> promise) {
        this.transport.send(address, message, promise);
    }

    @Override
    public Observable<Message> listen() {
        return this.messageObservable;
    }

    @Override
    public CompletableFuture<String> spreadGossip(Message message) {
        return this.gossip.spread(message);
    }

    @Override
    public Observable<Message> listenGossips() {
        return this.gossipObservable;
    }

    @Override
    public Collection<Member> members() {
        return Collections.unmodifiableCollection(this.members.values());
    }

    @Override
    public Member member() {
        return this.membership.member();
    }

    @Override
    public Optional<Member> member(String id) {
        return Optional.ofNullable(this.members.get(id));
    }

    @Override
    public Optional<Member> member(Address address) {
        return Optional.ofNullable(this.memberAddressIndex.get(address)).flatMap(memberId -> Optional.ofNullable(this.members.get(memberId)));
    }

    @Override
    public Collection<Member> otherMembers() {
        ArrayList otherMembers = new ArrayList(this.members.values());
        otherMembers.remove(this.membership.member());
        return Collections.unmodifiableCollection(otherMembers);
    }

    @Override
    public void updateMetadata(Map<String, String> metadata) {
        this.membership.updateMetadata(metadata);
    }

    @Override
    public void updateMetadataProperty(String key, String value) {
        this.membership.updateMetadataProperty(key, value);
    }

    @Override
    public Observable<MembershipEvent> listenMembership() {
        return this.membership.listen();
    }

    @Override
    public CompletableFuture<Void> shutdown() {
        LOGGER.info("Cluster member {} is shutting down...", (Object)this.membership.member());
        CompletableFuture<Void> transportStoppedFuture = new CompletableFuture<Void>();
        this.membership.leave().whenComplete((gossipId, error) -> {
            LOGGER.info("Cluster member notified about his leaving and shutting down... {}", (Object)this.membership.member());
            this.membership.stop();
            this.gossip.stop();
            this.failureDetector.stop();
            this.transport.stop(transportStoppedFuture);
            LOGGER.info("Cluster member has shut down... {}", (Object)this.membership.member());
        });
        return transportStoppedFuture;
    }

    @Override
    @Nonnull
    public NetworkEmulator networkEmulator() {
        return this.transport.networkEmulator();
    }

    @Override
    public boolean isShutdown() {
        return this.transport.isStopped();
    }
}

