/*
 * Decompiled with CFR 0.152.
 */
package com.questdb.cairo;

import com.questdb.cairo.AbstractCairoTest;
import com.questdb.cairo.Engine;
import com.questdb.cairo.PartitionBy;
import com.questdb.cairo.TableReader;
import com.questdb.cairo.TableReaderIncrementalRecordCursor;
import com.questdb.cairo.TableWriter;
import com.questdb.cairo.sql.CairoEngine;
import com.questdb.cairo.sql.Record;
import com.questdb.griffin.SqlCompiler;
import com.questdb.griffin.engine.functions.bind.BindVariableService;
import com.questdb.std.BinarySequence;
import com.questdb.std.Rnd;
import com.questdb.std.Unsafe;
import com.questdb.test.tools.TestUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;

public class BusyPollTest
extends AbstractCairoTest {
    private static final Engine engine = new Engine(configuration);
    private static final SqlCompiler compiler = new SqlCompiler((CairoEngine)engine, configuration);
    private static final BindVariableService bindVariableService = new BindVariableService();

    @Test
    public void testBusyPollByDay() throws Exception {
        this.testBusyPollFromMidTable(0, 3000000000L);
    }

    @Test
    public void testBusyPollByMonth() throws Exception {
        this.testBusyPollFromMidTable(1, 50000000000L);
    }

    @Test
    public void testBusyPollByNone() throws Exception {
        this.testBusyPollFromMidTable(3, 10000L);
    }

    @Test
    public void testBusyPollByYear() throws Exception {
        this.testBusyPollFromMidTable(2, 182500000000L);
    }

    @Test
    public void testByDay() throws Exception {
        this.testBusyPoll(10000000L, 300000, "create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by DAY");
    }

    @Test
    public void testByMonth() throws Exception {
        this.testBusyPoll(40000000L, 300000, "create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by MONTH");
    }

    @Test
    public void testByYear() throws Exception {
        this.testBusyPoll(480000000L, 300000, "create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by YEAR");
    }

    @Test
    public void testNonPartitioned() throws Exception {
        this.testBusyPoll(10000L, 3000000, "create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by NONE");
    }

    private void appendRecords(int start, int n, long timestampIncrement, TableWriter writer, long ts, long addr, Rnd rnd) {
        for (int i = 0; i < n; ++i) {
            TableWriter.Row row = writer.newRow(ts);
            row.putInt(0, i);
            for (int k = 0; k < 1024; ++k) {
                Unsafe.getUnsafe().putByte(addr + (long)k, rnd.nextByte());
            }
            row.putBin(1, addr, 1024L);
            row.putLong(2, (long)(start + n - i));
            row.append();
            writer.commit();
            ts += timestampIncrement;
        }
    }

    private void testBusyPoll(long timestampIncrement, int n, String createStatement) throws Exception {
        TestUtils.assertMemoryLeak(() -> {
            compiler.compile((CharSequence)createStatement, bindVariableService);
            AtomicInteger errorCount = new AtomicInteger();
            CyclicBarrier barrier = new CyclicBarrier(2);
            CountDownLatch latch = new CountDownLatch(2);
            try {
                new Thread(() -> {
                    try (TableWriter writer = engine.getWriter((CharSequence)"xyz");){
                        barrier.await();
                        long ts = 0L;
                        long addr = Unsafe.malloc((long)128L);
                        try {
                            Rnd rnd = new Rnd();
                            for (int i = 0; i < n; ++i) {
                                TableWriter.Row row = writer.newRow(ts);
                                row.putInt(0, i);
                                for (int k = 0; k < 128; ++k) {
                                    Unsafe.getUnsafe().putByte(addr + (long)k, rnd.nextByte());
                                }
                                row.putBin(1, addr, 128L);
                                row.putLong(2, rnd.nextLong());
                                row.append();
                                writer.commit();
                                ts += timestampIncrement;
                            }
                        }
                        finally {
                            Unsafe.free((long)addr, (long)128L);
                        }
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                        errorCount.incrementAndGet();
                    }
                    finally {
                        latch.countDown();
                    }
                }).start();
                new Thread(() -> {
                    try (TableReader reader = engine.getReader((CharSequence)"xyz", -1L);){
                        Rnd rnd = new Rnd();
                        int count = 0;
                        TableReaderIncrementalRecordCursor cursor = new TableReaderIncrementalRecordCursor();
                        cursor.of(reader);
                        Record record = cursor.getRecord();
                        barrier.await();
                        while (count < n) {
                            if (!cursor.reload()) continue;
                            while (cursor.hasNext()) {
                                Assert.assertEquals((long)count, (long)record.getInt(0));
                                BinarySequence binarySequence = record.getBin(1);
                                for (int i = 0; i < 128; ++i) {
                                    Assert.assertEquals((long)rnd.nextByte(), (long)binarySequence.byteAt((long)i));
                                }
                                Assert.assertEquals((long)rnd.nextLong(), (long)record.getLong(2));
                                ++count;
                            }
                        }
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                        errorCount.incrementAndGet();
                    }
                    finally {
                        latch.countDown();
                    }
                }).start();
                Assert.assertTrue((boolean)latch.await(600L, TimeUnit.SECONDS));
                Assert.assertEquals((long)0L, (long)errorCount.get());
            }
            finally {
                engine.releaseAllReaders();
                engine.releaseAllWriters();
            }
        });
    }

    private void testBusyPollFromMidTable(int partitionBy, long timestampIncrement) throws Exception {
        int blobSize = 1024;
        int n = 1000;
        TestUtils.assertMemoryLeak(() -> {
            try {
                compiler.compile((CharSequence)("create table xyz (sequence INT, event BINARY, ts LONG, stamp TIMESTAMP) timestamp(stamp) partition by " + PartitionBy.toString((int)partitionBy)), null);
                try (TableWriter writer = engine.getWriter((CharSequence)"xyz");){
                    long ts = 0L;
                    long addr = Unsafe.malloc((long)1024L);
                    try {
                        Rnd rnd = new Rnd();
                        this.appendRecords(0, 1000, timestampIncrement, writer, ts, addr, rnd);
                        ts = 1000L * timestampIncrement;
                        try (TableReader reader = engine.getReader((CharSequence)"xyz", -1L);
                             TableReaderIncrementalRecordCursor cursor = new TableReaderIncrementalRecordCursor();){
                            cursor.of(reader);
                            Assert.assertTrue((boolean)cursor.reload());
                            int count = 0;
                            Record record = cursor.getRecord();
                            while (cursor.hasNext()) {
                                Assert.assertEquals((long)(1000 - count), (long)record.getLong(2));
                                ++count;
                            }
                            Assert.assertFalse((boolean)cursor.reload());
                            Assert.assertFalse((boolean)cursor.hasNext());
                            this.appendRecords(1000, 1000, timestampIncrement, writer, ts, addr, rnd);
                            Assert.assertTrue((boolean)cursor.reload());
                            count = 0;
                            while (cursor.hasNext()) {
                                Assert.assertEquals((long)(2000 - count), (long)record.getLong(2));
                                ++count;
                            }
                            writer.truncate();
                            Assert.assertTrue((boolean)cursor.reload());
                            Assert.assertFalse((boolean)cursor.hasNext());
                            this.appendRecords(2000, 500, timestampIncrement, writer, ts, addr, rnd);
                            Assert.assertTrue((boolean)cursor.reload());
                            count = 0;
                            while (cursor.hasNext()) {
                                Assert.assertEquals((long)(2500 - count), (long)record.getLong(2));
                                ++count;
                            }
                            Assert.assertEquals((long)500L, (long)count);
                        }
                    }
                    finally {
                        Unsafe.free((long)addr, (long)1024L);
                    }
                }
            }
            finally {
                engine.releaseAllReaders();
                engine.releaseAllWriters();
            }
        });
    }
}

