/*
 * Decompiled with CFR 0.152.
 */
package com.questdb.cutlass.line.udp;

import com.questdb.cairo.AbstractCairoTest;
import com.questdb.cairo.CairoConfiguration;
import com.questdb.cairo.CairoException;
import com.questdb.cairo.CairoTestUtils;
import com.questdb.cairo.DefaultCairoConfiguration;
import com.questdb.cairo.RecordCursorPrinter;
import com.questdb.cairo.TableModel;
import com.questdb.cairo.TableReader;
import com.questdb.cairo.TableWriter;
import com.questdb.cairo.pool.WriterPool;
import com.questdb.cairo.sql.RecordCursor;
import com.questdb.cutlass.line.udp.GenericLineProtoReceiver;
import com.questdb.cutlass.line.udp.LineProtoSender;
import com.questdb.cutlass.line.udp.LinuxLineProtoReceiver;
import com.questdb.cutlass.line.udp.ReceiverConfiguration;
import com.questdb.cutlass.line.udp.ReceiverFactory;
import com.questdb.mp.Job;
import com.questdb.mp.Worker;
import com.questdb.std.Misc;
import com.questdb.std.NetFacade;
import com.questdb.std.NetFacadeImpl;
import com.questdb.std.NetworkFacadeImpl;
import com.questdb.std.ObjHashSet;
import com.questdb.std.Os;
import com.questdb.std.str.CharSink;
import com.questdb.std.str.StringSink;
import com.questdb.test.tools.TestUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.junit.Assert;
import org.junit.Test;

public class LinuxLineProtoReceiverTest
extends AbstractCairoTest {
    private static final ReceiverFactory LINUX_FACTORY = LinuxLineProtoReceiver::new;
    private static final ReceiverFactory GENERIC_FACTORY = GenericLineProtoReceiver::new;

    @Test
    public void testGenericCannotBindSocket() throws Exception {
        this.assertCannotBindSocket(GENERIC_FACTORY);
    }

    @Test
    public void testGenericCannotJoin() throws Exception {
        this.assertCannotJoin(GENERIC_FACTORY);
    }

    @Test
    public void testGenericCannotOpenSocket() throws Exception {
        this.assertCannotOpenSocket(GENERIC_FACTORY);
    }

    @Test
    public void testGenericCannotSetReceiveBuffer() throws Exception {
        this.assertCannotSetReceiveBuffer(GENERIC_FACTORY);
    }

    @Test
    public void testGenericFrequentCommit() throws Exception {
        this.assertFrequentCommit(GENERIC_FACTORY);
    }

    @Test
    public void testGenericSimpleReceive() throws Exception {
        this.assertReceive(new TestReceiverConfiguration(), GENERIC_FACTORY);
    }

    @Test
    public void testLinuxCannotBindSocket() throws Exception {
        if (Os.type != 2) {
            return;
        }
        this.assertCannotBindSocket(LINUX_FACTORY);
    }

    @Test
    public void testLinuxCannotJoin() throws Exception {
        if (Os.type != 2) {
            return;
        }
        this.assertCannotJoin(LINUX_FACTORY);
    }

    @Test
    public void testLinuxCannotOpenSocket() throws Exception {
        if (Os.type != 2) {
            return;
        }
        this.assertCannotOpenSocket(LINUX_FACTORY);
    }

    @Test
    public void testLinuxCannotSetReceiveBuffer() throws Exception {
        if (Os.type != 2) {
            return;
        }
        this.assertCannotSetReceiveBuffer(LINUX_FACTORY);
    }

    @Test
    public void testLinuxFrequentCommit() throws Exception {
        if (Os.type != 2) {
            return;
        }
        this.assertFrequentCommit(LINUX_FACTORY);
    }

    @Test
    public void testLinuxSimpleReceive() throws Exception {
        if (Os.type != 2) {
            return;
        }
        this.assertReceive(new TestReceiverConfiguration(), LINUX_FACTORY);
    }

    private void assertCannotBindSocket(ReceiverFactory factory) throws Exception {
        TestUtils.assertMemoryLeak(() -> {
            NetFacadeImpl nf = new NetFacadeImpl(){

                public boolean bindUdp(long fd, CharSequence IPv4Address, int port) {
                    return false;
                }
            };
            TestReceiverConfiguration receiverCfg = new TestReceiverConfiguration((NetFacade)nf){
                final /* synthetic */ NetFacade val$nf;
                {
                    this.val$nf = netFacade;
                }

                @Override
                public NetFacade getNetFacade() {
                    return this.val$nf;
                }
            };
            this.assertConstructorFail(receiverCfg, factory);
        });
    }

    private void assertCannotJoin(ReceiverFactory factory) throws Exception {
        TestUtils.assertMemoryLeak(() -> {
            NetFacadeImpl nf = new NetFacadeImpl(){

                public boolean join(long fd, CharSequence bindIPv4Address, CharSequence groupIPv4Address) {
                    return false;
                }
            };
            TestReceiverConfiguration receiverCfg = new TestReceiverConfiguration((NetFacade)nf){
                final /* synthetic */ NetFacade val$nf;
                {
                    this.val$nf = netFacade;
                }

                @Override
                public NetFacade getNetFacade() {
                    return this.val$nf;
                }
            };
            this.assertConstructorFail(receiverCfg, factory);
        });
    }

    private void assertCannotOpenSocket(ReceiverFactory factory) throws Exception {
        TestUtils.assertMemoryLeak(() -> {
            NetFacadeImpl nf = new NetFacadeImpl(){

                public long socketUdp() {
                    return -1L;
                }
            };
            TestReceiverConfiguration receiverCfg = new TestReceiverConfiguration((NetFacade)nf){
                final /* synthetic */ NetFacade val$nf;
                {
                    this.val$nf = netFacade;
                }

                @Override
                public NetFacade getNetFacade() {
                    return this.val$nf;
                }
            };
            this.assertConstructorFail(receiverCfg, factory);
        });
    }

    private void assertCannotSetReceiveBuffer(ReceiverFactory factory) throws Exception {
        NetFacadeImpl nf = new NetFacadeImpl(){

            public int setRcvBuf(long fd, int size) {
                return -1;
            }
        };
        TestReceiverConfiguration configuration = new TestReceiverConfiguration((NetFacade)nf){
            final /* synthetic */ NetFacade val$nf;
            {
                this.val$nf = netFacade;
            }

            @Override
            public NetFacade getNetFacade() {
                return this.val$nf;
            }

            @Override
            public int getReceiveBufferSize() {
                return 2048;
            }
        };
        this.assertReceive(configuration, factory);
    }

    private void assertConstructorFail(ReceiverConfiguration receiverCfg, ReceiverFactory factory) {
        DefaultCairoConfiguration cairoCfg = new DefaultCairoConfiguration(root);
        try (WriterPool pool = new WriterPool((CairoConfiguration)cairoCfg, null);){
            try {
                factory.createReceiver(receiverCfg, (CairoConfiguration)cairoCfg, pool);
                Assert.fail();
            }
            catch (CairoException cairoException) {
                // empty catch block
            }
        }
    }

    private void assertFrequentCommit(ReceiverFactory factory) throws Exception {
        TestReceiverConfiguration configuration = new TestReceiverConfiguration(){

            @Override
            public int getCommitRate() {
                return 0;
            }
        };
        this.assertReceive(configuration, factory);
    }

    private void assertReceive(ReceiverConfiguration receiverCfg, ReceiverFactory factory) throws Exception {
        TestUtils.assertMemoryLeak(() -> {
            String expected = "colour\tshape\tsize\ttimestamp\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\n";
            DefaultCairoConfiguration cairoCfg = new DefaultCairoConfiguration(root);
            try (WriterPool pool = new WriterPool((CairoConfiguration)cairoCfg, null);){
                Job receiver = factory.createReceiver(receiverCfg, (CairoConfiguration)cairoCfg, pool);
                try {
                    CountDownLatch workerHaltLatch = new CountDownLatch(1);
                    try (TableModel model = new TableModel(configuration, "tab", 3).col("colour", 8).col("shape", 8).col("size", 6).timestamp();){
                        CairoTestUtils.create(model);
                    }
                    var9_11 = null;
                    try (TableWriter w = pool.get((CharSequence)"tab");){
                        w.warmUp();
                    }
                    catch (Throwable throwable) {
                        var9_11 = throwable;
                        throw throwable;
                    }
                    ObjHashSet jobs = new ObjHashSet();
                    jobs.add((Object)receiver);
                    Worker worker = new Worker(jobs, workerHaltLatch);
                    worker.start();
                    try (LineProtoSender sender = new LineProtoSender(NetworkFacadeImpl.INSTANCE, receiverCfg.getBindIPv4Address(), receiverCfg.getPort(), 1400);){
                        for (int i = 0; i < 10; ++i) {
                            sender.metric((CharSequence)"tab").tag((CharSequence)"colour", (CharSequence)"blue").tag((CharSequence)"shape", (CharSequence)"square").field((CharSequence)"size", 3.4, 4).$(100000000L);
                        }
                        sender.flush();
                    }
                    var11_18 = null;
                    try (TableReader reader = new TableReader((CairoConfiguration)cairoCfg, (CharSequence)"tab");){
                        int count = 1000000;
                        while (count-- > 0 && reader.size() < 10L) {
                            reader.reload();
                            LockSupport.parkNanos(1L);
                        }
                        Assert.assertTrue((count > 0 ? 1 : 0) != 0);
                        worker.halt();
                        Assert.assertTrue((boolean)workerHaltLatch.await(3L, TimeUnit.SECONDS));
                        StringSink sink = new StringSink();
                        RecordCursorPrinter printer = new RecordCursorPrinter((CharSink)sink);
                        printer.print((RecordCursor)reader.getCursor(), reader.getMetadata(), true);
                        TestUtils.assertEquals((CharSequence)"colour\tshape\tsize\ttimestamp\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\nblue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\n", (CharSequence)sink);
                    }
                    catch (Throwable throwable) {
                        var11_18 = throwable;
                        throw throwable;
                    }
                }
                finally {
                    Misc.free((Object)receiver);
                }
            }
        });
    }

    private static class TestReceiverConfiguration
    implements ReceiverConfiguration {
        private TestReceiverConfiguration() {
        }

        public int getCommitRate() {
            return 0x100000;
        }

        public CharSequence getBindIPv4Address() {
            return "127.0.0.1";
        }

        public CharSequence getGroupIPv4Address() {
            return "224.1.1.1";
        }

        public int getMsgBufferSize() {
            return 2048;
        }

        public int getMsgCount() {
            return 10000;
        }

        public int getPort() {
            return 4567;
        }

        public int getReceiveBufferSize() {
            return -1;
        }

        public NetFacade getNetFacade() {
            return NetFacadeImpl.INSTANCE;
        }
    }
}

