/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.coordination;

import io.atomix.catalyst.util.Listener;
import io.atomix.coordination.state.TopicCommands;
import io.atomix.coordination.state.TopicState;
import io.atomix.copycat.client.RaftClient;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.Consistency;
import io.atomix.resource.ResourceInfo;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

@ResourceInfo(stateMachine=TopicState.class)
public class DistributedTopic<T>
extends AbstractResource {
    private final Set<Consumer<T>> listeners = new HashSet<Consumer<T>>();

    public DistributedTopic(RaftClient client) {
        super(client);
        client.session().onEvent("message", event -> {
            for (Consumer<Object> consumer : this.listeners) {
                consumer.accept(event);
            }
        });
    }

    public DistributedTopic<T> with(Consistency consistency) {
        super.with(consistency);
        return this;
    }

    public DistributedTopic<T> sync() {
        return this.with(Consistency.ATOMIC);
    }

    public DistributedTopic<T> async() {
        return this.with(Consistency.SEQUENTIAL);
    }

    public CompletableFuture<Void> publish(T message) {
        return this.submit(new TopicCommands.Publish<T>(message));
    }

    public CompletableFuture<Listener<T>> subscribe(Consumer<T> listener) {
        if (!this.listeners.isEmpty()) {
            this.listeners.add(listener);
            return CompletableFuture.completedFuture(new TopicListener(listener));
        }
        this.listeners.add(listener);
        return this.submit(new TopicCommands.Listen()).thenApply(v -> new TopicListener(listener));
    }

    private class TopicListener
    implements Listener<T> {
        private final Consumer<T> listener;

        private TopicListener(Consumer<T> listener) {
            this.listener = listener;
        }

        public void accept(T message) {
            this.listener.accept(message);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            DistributedTopic distributedTopic = DistributedTopic.this;
            synchronized (distributedTopic) {
                DistributedTopic.this.listeners.remove(this.listener);
                if (DistributedTopic.this.listeners.isEmpty()) {
                    DistributedTopic.this.submit(new TopicCommands.Unlisten());
                }
            }
        }
    }
}

