package reactor.pipe.example.twitter;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.fn.Consumer;
import reactor.kafka.KafkaSubscriber;
import reactor.pipe.Firehose;
import reactor.pipe.key.Key;

/* loaded from: input_file:reactor/pipe/example/twitter/ActivityDashboard.class */
public class ActivityDashboard {
    private static final Logger LOGGER = LoggerFactory.getLogger(ActivityDashboard.class);
    private ObjectMapper objectMapper = new ObjectMapper();

    public ActivityDashboard() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        newFixedThreadPool.submit(producerRunnable());
        newFixedThreadPool.submit(consumerRunnable());
    }

    protected Runnable producerRunnable() {
        final Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "workstation:9092");
        properties.setProperty("key.serializer", "reactor.pipe.example.twitter.KeySerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new Runnable() { // from class: reactor.pipe.example.twitter.ActivityDashboard.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    KafkaSubscriber kafkaSubscriber = new KafkaSubscriber(properties, "events", key -> {
                        return 0;
                    }, new Consumer<Throwable>() { // from class: reactor.pipe.example.twitter.ActivityDashboard.1.1
                        public void accept(Throwable th) {
                            ActivityDashboard.LOGGER.error("Received an error", th);
                        }
                    });
                    Firehose firehose = new Firehose();
                    firehose.makePublisher(Key.wrap("kafka")).subscribe(kafkaSubscriber);
                    while (true) {
                        ActivityDashboard.LOGGER.info("Ping sent...");
                        firehose.notify(Key.wrap("kafka"), "ping");
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                } catch (Exception e2) {
                    ActivityDashboard.LOGGER.error("", e2);
                }
            }
        };
    }

    protected Runnable consumerRunnable() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "workstation:9092");
        properties.setProperty("zookeeper.connect", "workstation:2181");
        properties.setProperty("group.id", "testGroup");
        properties.setProperty("auto.offset.reset", "largest");
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("session.timeout.ms", "1000");
        return new Runnable() { // from class: reactor.pipe.example.twitter.ActivityDashboard.2
            @Override // java.lang.Runnable
            public void run() {
            }
        };
    }

    public static void main(String[] strArr) {
        new ActivityDashboard();
    }
}
