/*
 * Decompiled with CFR 0.152.
 */
package com.questdb.net.ha;

import com.questdb.net.ha.bridge.JournalEventBridge;
import com.questdb.net.ha.bridge.JournalEventHandler;
import com.questdb.net.ha.bridge.JournalEventProcessor;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;

public class JournalEventBridgeTest {
    @Test
    public void testStartStop() {
        JournalEventBridge bridge = new JournalEventBridge(2L, TimeUnit.SECONDS);
        for (int i = 0; i < 10000; ++i) {
            bridge.publish(10, System.currentTimeMillis());
        }
    }

    @Test
    public void testTwoPublishersThreeConsumers() throws Exception {
        int i;
        ExecutorService service = Executors.newCachedThreadPool();
        JournalEventBridge bridge = new JournalEventBridge(50L, TimeUnit.MILLISECONDS);
        Future[] publishers = new Future[2];
        Handler[] consumers = new Handler[3];
        int batchSize = 1000;
        CyclicBarrier barrier = new CyclicBarrier(publishers.length + consumers.length);
        CountDownLatch latch = new CountDownLatch(publishers.length + consumers.length);
        for (i = 0; i < publishers.length; ++i) {
            int index = i;
            publishers[i] = service.submit(() -> {
                int count = 0;
                try {
                    barrier.await();
                    for (int k = 0; k < 1000; ++k) {
                        long ts = System.nanoTime();
                        bridge.publish(index, ts);
                        ++count;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                finally {
                    latch.countDown();
                }
                return count;
            });
        }
        for (i = 0; i < consumers.length; ++i) {
            Handler handler;
            JournalEventProcessor processor = new JournalEventProcessor(bridge);
            consumers[i] = handler = new Handler(i);
            service.submit(() -> {
                try {
                    barrier.await();
                    while (processor.process((JournalEventHandler)handler, true)) {
                    }
                }
                catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                finally {
                    latch.countDown();
                }
            });
        }
        latch.await();
        for (Future f : publishers) {
            Assert.assertEquals((Object)1000, f.get());
        }
        Assert.assertEquals((long)1000L, (long)consumers[0].getCounter());
        Assert.assertEquals((long)1000L, (long)consumers[1].getCounter());
        Assert.assertEquals((long)0L, (long)consumers[2].getCounter());
    }

    private class Handler
    implements JournalEventHandler {
        private final int index;
        private int counter;

        private Handler(int index) {
            this.index = index;
        }

        public int getCounter() {
            return this.counter;
        }

        public void handle(int journalIndex) {
            if (journalIndex == this.index) {
                ++this.counter;
            }
        }
    }
}

