/*
 * Decompiled with CFR 0.152.
 */
package com.questdb.mp;

import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.Barrier;
import com.questdb.mp.BlockingWaitStrategy;
import com.questdb.mp.Event;
import com.questdb.mp.FanOut;
import com.questdb.mp.MCSequence;
import com.questdb.mp.RingQueue;
import com.questdb.mp.SCSequence;
import com.questdb.mp.SPSequence;
import com.questdb.mp.Sequence;
import com.questdb.mp.WaitStrategy;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.LockSupport;
import org.junit.Assert;
import org.junit.Test;

public class ConcurrentTest {
    private static final Log LOG = LogFactory.getLog(ConcurrentTest.class);

    @Test
    public void testFanOutChain() {
        LOG.info().$((CharSequence)"testFanOutChain").$();
        int cycle = 1024;
        SPSequence a = new SPSequence(cycle);
        SCSequence b = new SCSequence();
        SCSequence c = new SCSequence();
        SCSequence d = new SCSequence();
        SCSequence e = new SCSequence();
        a.then((Barrier)FanOut.to((Barrier)FanOut.to((Barrier)d).and((Barrier)e)).and(b.then((Barrier)c))).then((Barrier)a);
    }

    @Test
    public void testOneToManyBusy() throws Exception {
        LOG.info().$((CharSequence)"testOneToManyBusy").$();
        int cycle = 1024;
        int size = 1024 * cycle;
        RingQueue queue = new RingQueue(Event.FACTORY, cycle);
        SPSequence pubSeq = new SPSequence(cycle);
        MCSequence subSeq = new MCSequence(cycle);
        pubSeq.then((Barrier)subSeq).then((Barrier)pubSeq);
        CyclicBarrier barrier = new CyclicBarrier(3);
        CountDownLatch latch = new CountDownLatch(2);
        BusyConsumer[] consumers = new BusyConsumer[]{new BusyConsumer(size, (Sequence)subSeq, (RingQueue<Event>)queue, barrier, latch), new BusyConsumer(size, (Sequence)subSeq, (RingQueue<Event>)queue, barrier, latch)};
        consumers[0].start();
        consumers[1].start();
        barrier.await();
        int i = 0;
        while (true) {
            long cursor;
            if ((cursor = pubSeq.next()) < 0L) {
                continue;
            }
            ((Event)queue.get((long)cursor)).value = i++;
            pubSeq.done(cursor);
            if (i == size) break;
        }
        ConcurrentTest.publishEOE((RingQueue<Event>)queue, (Sequence)pubSeq);
        ConcurrentTest.publishEOE((RingQueue<Event>)queue, (Sequence)pubSeq);
        latch.await();
        int[] buf = new int[size];
        System.arraycopy(consumers[0].buf, 0, buf, 0, consumers[0].finalIndex);
        System.arraycopy(consumers[1].buf, 0, buf, consumers[0].finalIndex, consumers[1].finalIndex);
        Arrays.sort(buf);
        for (i = 0; i < buf.length; ++i) {
            Assert.assertEquals((long)i, (long)buf[i]);
        }
    }

    @Test
    public void testOneToManyWaiting() throws Exception {
        LOG.info().$((CharSequence)"testOneToManyWaiting").$();
        int cycle = 1024;
        int size = 1024 * cycle;
        RingQueue queue = new RingQueue(Event.FACTORY, cycle);
        SPSequence pubSeq = new SPSequence(cycle);
        MCSequence subSeq = new MCSequence(cycle, (WaitStrategy)new BlockingWaitStrategy());
        pubSeq.then((Barrier)subSeq).then((Barrier)pubSeq);
        CyclicBarrier barrier = new CyclicBarrier(3);
        CountDownLatch latch = new CountDownLatch(2);
        WaitingConsumer[] consumers = new WaitingConsumer[]{new WaitingConsumer(size, (Sequence)subSeq, (RingQueue<Event>)queue, barrier, latch), new WaitingConsumer(size, (Sequence)subSeq, (RingQueue<Event>)queue, barrier, latch)};
        consumers[0].start();
        consumers[1].start();
        barrier.await();
        int i = 0;
        do {
            long cursor = pubSeq.nextBully();
            ((Event)queue.get((long)cursor)).value = i++;
            pubSeq.done(cursor);
        } while (i != size);
        ConcurrentTest.publishEOE((RingQueue<Event>)queue, (Sequence)pubSeq);
        ConcurrentTest.publishEOE((RingQueue<Event>)queue, (Sequence)pubSeq);
        latch.await();
        int[] buf = new int[size];
        System.arraycopy(consumers[0].buf, 0, buf, 0, consumers[0].finalIndex);
        System.arraycopy(consumers[1].buf, 0, buf, consumers[0].finalIndex, consumers[1].finalIndex);
        Arrays.sort(buf);
        for (i = 0; i < buf.length; ++i) {
            Assert.assertEquals((long)i, (long)buf[i]);
        }
    }

    @Test
    public void testOneToOneBusy() throws Exception {
        LOG.info().$((CharSequence)"testOneToOneBusy").$();
        int cycle = 1024;
        int size = 1024 * cycle;
        RingQueue queue = new RingQueue(Event.FACTORY, cycle);
        SPSequence pubSeq = new SPSequence(cycle);
        SCSequence subSeq = new SCSequence((WaitStrategy)new BlockingWaitStrategy());
        pubSeq.then((Barrier)subSeq).then((Barrier)pubSeq);
        CyclicBarrier barrier = new CyclicBarrier(2);
        CountDownLatch latch = new CountDownLatch(1);
        BusyConsumer consumer = new BusyConsumer(size, (Sequence)subSeq, (RingQueue<Event>)queue, barrier, latch);
        consumer.start();
        barrier.await();
        int i = 0;
        while (true) {
            long cursor;
            if ((cursor = pubSeq.next()) < 0L) {
                continue;
            }
            ((Event)queue.get((long)cursor)).value = i++;
            pubSeq.done(cursor);
            if (i == size) break;
        }
        ConcurrentTest.publishEOE((RingQueue<Event>)queue, (Sequence)pubSeq);
        latch.await();
        int[] buf = consumer.buf;
        for (i = 0; i < buf.length; ++i) {
            Assert.assertEquals((long)i, (long)buf[i]);
        }
    }

    @Test
    public void testOneToOneWaiting() throws Exception {
        LOG.info().$((CharSequence)"testOneToOneWaiting").$();
        int cycle = 1024;
        int size = 1024 * cycle;
        RingQueue queue = new RingQueue(Event.FACTORY, cycle);
        SPSequence pubSeq = new SPSequence(cycle);
        SCSequence subSeq = new SCSequence((WaitStrategy)new BlockingWaitStrategy());
        pubSeq.then((Barrier)subSeq).then((Barrier)pubSeq);
        CyclicBarrier barrier = new CyclicBarrier(2);
        CountDownLatch latch = new CountDownLatch(1);
        WaitingConsumer consumer = new WaitingConsumer(size, (Sequence)subSeq, (RingQueue<Event>)queue, barrier, latch);
        consumer.start();
        barrier.await();
        int i = 0;
        do {
            long cursor = pubSeq.nextBully();
            ((Event)queue.get((long)cursor)).value = i++;
            pubSeq.done(cursor);
        } while (i != size);
        ConcurrentTest.publishEOE((RingQueue<Event>)queue, (Sequence)pubSeq);
        latch.await();
        int[] buf = consumer.buf;
        for (i = 0; i < buf.length; ++i) {
            Assert.assertEquals((long)i, (long)buf[i]);
        }
    }

    @Test
    public void testOneToParallelMany() throws Exception {
        LOG.info().$((CharSequence)"testOneToParallelMany").$();
        int cycle = 1024;
        int size = 1024 * cycle;
        RingQueue queue = new RingQueue(Event.FACTORY, cycle);
        SPSequence pubSeq = new SPSequence(cycle);
        SCSequence sub1 = new SCSequence();
        SCSequence sub2 = new SCSequence();
        pubSeq.then((Barrier)FanOut.to((Barrier)sub1).and((Barrier)sub2)).then((Barrier)pubSeq);
        CyclicBarrier barrier = new CyclicBarrier(3);
        CountDownLatch latch = new CountDownLatch(2);
        BusyConsumer[] consumers = new BusyConsumer[]{new BusyConsumer(size, (Sequence)sub1, (RingQueue<Event>)queue, barrier, latch), new BusyConsumer(size, (Sequence)sub2, (RingQueue<Event>)queue, barrier, latch)};
        consumers[0].start();
        consumers[1].start();
        barrier.await();
        int i = 0;
        while (true) {
            long cursor;
            if ((cursor = pubSeq.next()) < 0L) {
                LockSupport.parkNanos(1L);
                continue;
            }
            ((Event)queue.get((long)cursor)).value = i++;
            pubSeq.done(cursor);
            if (i == size) break;
        }
        ConcurrentTest.publishEOE((RingQueue<Event>)queue, (Sequence)pubSeq);
        ConcurrentTest.publishEOE((RingQueue<Event>)queue, (Sequence)pubSeq);
        latch.await();
        for (int k = 0; k < 2; ++k) {
            for (i = 0; i < consumers[k].buf.length; ++i) {
                Assert.assertEquals((long)i, (long)consumers[k].buf[i]);
            }
        }
    }

    @Test
    public void testOneToParallelSubscriber() throws Exception {
        LOG.info().$((CharSequence)"testOneToParallelSubscriber").$();
        int cycle = 1024;
        int size = 1024 * cycle;
        RingQueue queue = new RingQueue(Event.FACTORY, cycle);
        SPSequence pubSeq = new SPSequence(cycle);
        SCSequence sub1 = new SCSequence();
        SCSequence sub2 = new SCSequence();
        FanOut fanOut = FanOut.to((Barrier)sub1).and((Barrier)sub2);
        pubSeq.then((Barrier)fanOut).then((Barrier)pubSeq);
        CyclicBarrier barrier = new CyclicBarrier(4);
        CountDownLatch latch = new CountDownLatch(3);
        BusyConsumer[] consumers = new BusyConsumer[]{new BusyConsumer(size, (Sequence)sub1, (RingQueue<Event>)queue, barrier, latch), new BusyConsumer(size, (Sequence)sub2, (RingQueue<Event>)queue, barrier, latch)};
        BusySubscriber subscriber = new BusySubscriber((RingQueue<Event>)queue, barrier, latch, fanOut, (Sequence)pubSeq);
        subscriber.start();
        consumers[0].start();
        consumers[1].start();
        barrier.await();
        int i = 0;
        while (true) {
            long cursor;
            if ((cursor = pubSeq.next()) < 0L) {
                continue;
            }
            ((Event)queue.get((long)cursor)).value = i++;
            pubSeq.done(cursor);
            if (i == size) break;
        }
        ConcurrentTest.publishEOE((RingQueue<Event>)queue, (Sequence)pubSeq);
        ConcurrentTest.publishEOE((RingQueue<Event>)queue, (Sequence)pubSeq);
        latch.await();
        for (int k = 0; k < 2; ++k) {
            for (i = 0; i < consumers[k].buf.length; ++i) {
                Assert.assertEquals((long)i, (long)consumers[k].buf[i]);
            }
        }
        for (i = 0; i < subscriber.buf.length; ++i) {
            Assert.assertTrue((subscriber.buf[i] > 0 ? 1 : 0) != 0);
        }
    }

    static void publishEOE(RingQueue<Event> queue, Sequence sequence) {
        long cursor = sequence.nextBully();
        ((Event)queue.get((long)cursor)).value = Integer.MIN_VALUE;
        sequence.done(cursor);
    }

    private static class WaitingConsumer
    extends Thread {
        private final Sequence sequence;
        private final int[] buf;
        private final RingQueue<Event> queue;
        private final CyclicBarrier barrier;
        private final CountDownLatch latch;
        private volatile int finalIndex = 0;

        WaitingConsumer(int cycle, Sequence sequence, RingQueue<Event> queue, CyclicBarrier barrier, CountDownLatch latch) {
            this.sequence = sequence;
            this.buf = new int[cycle];
            this.queue = queue;
            this.barrier = barrier;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                this.barrier.await();
                int p = 0;
                while (true) {
                    long cursor = this.sequence.waitForNext();
                    int v = ((Event)this.queue.get((long)cursor)).value;
                    this.sequence.done(cursor);
                    if (v == Integer.MIN_VALUE) break;
                    this.buf[p++] = v;
                }
                this.finalIndex = p;
                this.latch.countDown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class BusySubscriber
    extends Thread {
        private final int[] buf = new int[20];
        private final RingQueue<Event> queue;
        private final CyclicBarrier barrier;
        private final CountDownLatch latch;
        private final FanOut fanOut;
        private final Sequence publisher;

        BusySubscriber(RingQueue<Event> queue, CyclicBarrier barrier, CountDownLatch latch, FanOut fanOut, Sequence publisher) {
            this.queue = queue;
            this.barrier = barrier;
            this.latch = latch;
            this.fanOut = fanOut;
            this.publisher = publisher;
        }

        @Override
        public void run() {
            try {
                this.barrier.await();
                Thread.sleep(10L);
                SCSequence sequence = new SCSequence(this.publisher.current());
                this.fanOut.and((Barrier)sequence);
                int p = 0;
                while (p < this.buf.length) {
                    long cursor = sequence.next();
                    if (cursor < 0L) {
                        LockSupport.parkNanos(1L);
                        continue;
                    }
                    int v = ((Event)this.queue.get((long)cursor)).value;
                    sequence.done(cursor);
                    if (v == Integer.MIN_VALUE) break;
                    this.buf[p++] = v;
                }
                this.fanOut.remove((Barrier)sequence);
                this.latch.countDown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class BusyConsumer
    extends Thread {
        private final Sequence sequence;
        private final int[] buf;
        private final RingQueue<Event> queue;
        private final CyclicBarrier barrier;
        private final CountDownLatch latch;
        private volatile int finalIndex = 0;

        BusyConsumer(int cycle, Sequence sequence, RingQueue<Event> queue, CyclicBarrier barrier, CountDownLatch latch) {
            this.sequence = sequence;
            this.buf = new int[cycle];
            this.queue = queue;
            this.barrier = barrier;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                this.barrier.await();
                int p = 0;
                while (true) {
                    long cursor;
                    if ((cursor = this.sequence.next()) < 0L) {
                        LockSupport.parkNanos(1L);
                        continue;
                    }
                    int v = ((Event)this.queue.get((long)cursor)).value;
                    this.sequence.done(cursor);
                    if (v == Integer.MIN_VALUE) break;
                    this.buf[p++] = v;
                }
                this.finalIndex = p;
                this.latch.countDown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

