/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.discovery;

import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.Member;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.discovery.ClusterMetadataDecoder;
import io.scalecube.services.discovery.api.DiscoveryConfig;
import io.scalecube.services.discovery.api.DiscoveryEvent;
import io.scalecube.services.discovery.api.ServiceDiscovery;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.transport.Address;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class ScalecubeServiceDiscovery
implements ServiceDiscovery {
    public static final String SERVICE_METADATA = "service";
    private ClusterConfig.Builder clusterConfig = ClusterConfig.builder();
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
    private ServiceRegistry serviceRegistry;
    private Cluster cluster;
    private ServiceEndpoint endpoint;
    private final DirectProcessor<DiscoveryEvent> subject = DirectProcessor.create();
    private final FluxSink<DiscoveryEvent> sink = this.subject.serialize().sink();

    @Override
    public Address address() {
        return this.cluster.address();
    }

    @Override
    public ServiceEndpoint endpoint() {
        return this.endpoint;
    }

    @Override
    public Mono<ServiceDiscovery> start(DiscoveryConfig config) {
        this.configure(config);
        this.clusterConfig.addMetadata(this.serviceRegistry.listServiceEndpoints().stream().collect(Collectors.toMap(ClusterMetadataDecoder::encodeMetadata, service -> SERVICE_METADATA)));
        CompletionStage promise = Cluster.join(this.clusterConfig.build()).whenComplete((success, error) -> {
            if (error == null) {
                this.cluster = success;
                this.init(this.cluster);
            }
        });
        return Mono.fromFuture(promise).map(mapper -> this);
    }

    @Override
    public Flux<DiscoveryEvent> listen() {
        return Flux.fromIterable(this.serviceRegistry.listServiceEndpoints()).map(DiscoveryEvent::registered).concatWith(this.subject);
    }

    @Override
    public Mono<Void> shutdown() {
        return Mono.defer(() -> {
            this.sink.complete();
            return Optional.ofNullable(this.cluster).map(cluster1 -> Mono.fromFuture(cluster1.shutdown())).orElse(Mono.empty());
        });
    }

    private void configure(DiscoveryConfig config) {
        this.serviceRegistry = config.serviceRegistry();
        this.endpoint = config.endpoint();
        if (config.seeds() != null) {
            this.clusterConfig.seedMembers(config.seeds());
        }
        if (config.port() != null) {
            this.clusterConfig.port(config.port());
        }
        if (config.tags() != null) {
            this.clusterConfig.metadata(config.tags());
        }
    }

    private void init(Cluster cluster) {
        this.loadClusterServices(cluster);
        this.listenCluster(cluster);
    }

    private void listenCluster(Cluster cluster) {
        cluster.listenMembership().subscribe(event -> {
            if (event.isAdded()) {
                this.loadMemberServices(DiscoveryType.ADDED, event.member());
            } else if (event.isRemoved()) {
                this.loadMemberServices(DiscoveryType.REMOVED, event.member());
            }
        });
    }

    private void loadClusterServices(Cluster cluster) {
        cluster.otherMembers().forEach(member -> this.loadMemberServices(DiscoveryType.DISCOVERED, (Member)member));
    }

    private void loadMemberServices(DiscoveryType type, Member member) {
        member.metadata().entrySet().stream().filter(entry -> SERVICE_METADATA.equals(entry.getValue())).forEach(entry -> {
            ServiceEndpoint serviceEndpoint = ClusterMetadataDecoder.decodeMetadata((String)entry.getKey());
            if (serviceEndpoint == null) {
                return;
            }
            LOGGER.debug("Member: {} is {} : {}", new Object[]{member, type, serviceEndpoint});
            if ((type.equals((Object)DiscoveryType.ADDED) || type.equals((Object)DiscoveryType.DISCOVERED)) && this.serviceRegistry.registerService(serviceEndpoint)) {
                LOGGER.info("Service Reference was ADDED since new Member has joined the cluster {} : {}", (Object)member, (Object)serviceEndpoint);
                DiscoveryEvent registrationEvent = DiscoveryEvent.registered(serviceEndpoint);
                LOGGER.debug("Publish registered: " + registrationEvent);
                this.sink.next(registrationEvent);
            } else if (type.equals((Object)DiscoveryType.REMOVED) && this.serviceRegistry.unregisterService(serviceEndpoint.id()) != null) {
                LOGGER.info("Service Reference was REMOVED since Member have left the cluster {} : {}", (Object)member, (Object)serviceEndpoint);
                DiscoveryEvent registrationEvent = DiscoveryEvent.unregistered(serviceEndpoint);
                LOGGER.debug("Publish unregistered: " + registrationEvent);
                this.sink.next(registrationEvent);
            }
        });
    }

    private static enum DiscoveryType {
        ADDED,
        REMOVED,
        DISCOVERED;

    }
}

