/*
 * Decompiled with CFR 0.152.
 */
package com.questdb.net.ha;

import com.questdb.model.Quote;
import com.questdb.model.Trade;
import com.questdb.net.ha.JournalServer;
import com.questdb.net.ha.JournalServerAgent;
import com.questdb.net.ha.MockByteChannel;
import com.questdb.net.ha.comsumer.HugeBufferConsumer;
import com.questdb.net.ha.comsumer.JournalDeltaConsumer;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.net.ha.model.IndexedJournal;
import com.questdb.net.ha.model.IndexedJournalKey;
import com.questdb.net.ha.producer.JournalClientStateProducer;
import com.questdb.net.ha.protocol.CommandConsumer;
import com.questdb.net.ha.protocol.CommandProducer;
import com.questdb.net.ha.protocol.commands.CharSequenceResponseConsumer;
import com.questdb.net.ha.protocol.commands.IntResponseConsumer;
import com.questdb.net.ha.protocol.commands.SetKeyRequestProducer;
import com.questdb.std.Chars;
import com.questdb.store.Journal;
import com.questdb.store.JournalWriter;
import com.questdb.store.factory.ReaderFactory;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JournalServerAgentTest
extends AbstractTest {
    @Rule
    public final TemporaryFolder temp = new TemporaryFolder();
    private final CommandProducer commandProducer = new CommandProducer();
    private final CommandConsumer commandConsumer = new CommandConsumer();
    private final SetKeyRequestProducer setKeyRequestProducer = new SetKeyRequestProducer();
    private final CharSequenceResponseConsumer charSequenceResponseConsumer = new CharSequenceResponseConsumer();
    private final JournalClientStateProducer journalClientStateProducer = new JournalClientStateProducer();
    private final IntResponseConsumer intResponseConsumer = new IntResponseConsumer();
    private MockByteChannel channel;
    private JournalWriter<Quote> quoteWriter;
    private JournalWriter<Trade> tradeWriter;
    private JournalServer server;
    private JournalServerAgent agent;
    private HugeBufferConsumer hugeBufferConsumer;

    @Before
    public void setUp() throws Exception {
        this.channel = new MockByteChannel();
        this.quoteWriter = this.getFactory().writer(Quote.class);
        this.tradeWriter = this.getFactory().writer(Trade.class);
        ServerConfig config = new ServerConfig(){
            {
                this.setHeartbeatFrequency(100L);
                this.setEnableMultiCast(false);
            }
        };
        this.server = new JournalServer(config, (ReaderFactory)this.getFactory());
        this.server.publish(this.quoteWriter);
        this.agent = new JournalServerAgent(this.server, (SocketAddress)new InetSocketAddress(7075), null);
        this.hugeBufferConsumer = new HugeBufferConsumer(this.temp.newFile());
    }

    @Override
    @After
    public void tearDown() {
        this.quoteWriter.close();
        this.tradeWriter.close();
        this.server.halt();
        this.agent.close();
        this.hugeBufferConsumer.free();
    }

    @Test
    public void testIncrementalInteraction() throws Exception {
        try (JournalWriter origin = this.getFactory().writer(Quote.class, "origin");){
            TestUtils.generateQuoteData((JournalWriter<Quote>)origin, 200);
            this.server.start();
            try (JournalWriter quoteClientWriter = this.getFactory().writer(Quote.class, "client");){
                JournalDeltaConsumer quoteDeltaConsumer = new JournalDeltaConsumer(quoteClientWriter);
                this.commandProducer.write((WritableByteChannel)this.channel, (byte)1);
                this.setKeyRequestProducer.write((WritableByteChannel)this.channel, (Object)new IndexedJournalKey(0, this.quoteWriter.getMetadata().getKey()));
                this.agent.process((ByteChannel)this.channel);
                this.charSequenceResponseConsumer.read((ReadableByteChannel)this.channel);
                TestUtils.assertEquals((CharSequence)"OK", (CharSequence)this.charSequenceResponseConsumer.getValue());
                this.hugeBufferConsumer.read((ReadableByteChannel)this.channel);
                this.commandProducer.write((WritableByteChannel)this.channel, (byte)2);
                this.journalClientStateProducer.write((WritableByteChannel)this.channel, (Object)new IndexedJournal(0, (Journal)quoteClientWriter));
                this.agent.process((ByteChannel)this.channel);
                this.charSequenceResponseConsumer.read((ReadableByteChannel)this.channel);
                TestUtils.assertEquals((CharSequence)"OK", (CharSequence)this.charSequenceResponseConsumer.getValue());
                this.quoteWriter.append(origin.query().all().asResultSet().subset(0, 100));
                this.quoteWriter.commit();
                this.commandProducer.write((WritableByteChannel)this.channel, (byte)3);
                this.agent.process((ByteChannel)this.channel);
                this.commandConsumer.read((ReadableByteChannel)this.channel);
                Assert.assertEquals((long)4L, (long)this.commandConsumer.getCommand());
                Assert.assertEquals((long)0L, (long)this.intResponseConsumer.getValue((ReadableByteChannel)this.channel));
                quoteDeltaConsumer.read((ReadableByteChannel)this.channel);
                Assert.assertEquals((long)100L, (long)quoteClientWriter.size());
                this.commandConsumer.read((ReadableByteChannel)this.channel);
                Assert.assertEquals((long)5L, (long)this.commandConsumer.getCommand());
                this.quoteWriter.append(origin.query().all().asResultSet().subset(100, 200));
                this.quoteWriter.commit();
                this.commandProducer.write((WritableByteChannel)this.channel, (byte)2);
                this.journalClientStateProducer.write((WritableByteChannel)this.channel, (Object)new IndexedJournal(0, (Journal)quoteClientWriter));
                this.agent.process((ByteChannel)this.channel);
                this.charSequenceResponseConsumer.read((ReadableByteChannel)this.channel);
                TestUtils.assertEquals((CharSequence)"OK", (CharSequence)this.charSequenceResponseConsumer.getValue());
                this.commandProducer.write((WritableByteChannel)this.channel, (byte)3);
                this.agent.process((ByteChannel)this.channel);
                this.commandConsumer.read((ReadableByteChannel)this.channel);
                Assert.assertEquals((long)4L, (long)this.commandConsumer.getCommand());
                Assert.assertEquals((long)0L, (long)this.intResponseConsumer.getValue((ReadableByteChannel)this.channel));
                quoteDeltaConsumer.read((ReadableByteChannel)this.channel);
                Assert.assertEquals((long)200L, (long)quoteClientWriter.size());
                this.commandConsumer.read((ReadableByteChannel)this.channel);
                Assert.assertEquals((long)5L, (long)this.commandConsumer.getCommand());
            }
        }
    }

    @Test
    public void testJournalIndexCorrectness() throws Exception {
        this.server.publish(this.tradeWriter);
        this.server.start();
        try (JournalWriter quoteClientWriter = this.getFactory().writer(Quote.class, "client");){
            this.commandProducer.write((WritableByteChannel)this.channel, (byte)1);
            this.setKeyRequestProducer.write((WritableByteChannel)this.channel, (Object)new IndexedJournalKey(0, this.quoteWriter.getMetadata().getKey()));
            this.agent.process((ByteChannel)this.channel);
            this.charSequenceResponseConsumer.read((ReadableByteChannel)this.channel);
            TestUtils.assertEquals((CharSequence)"OK", (CharSequence)this.charSequenceResponseConsumer.getValue());
            this.hugeBufferConsumer.read((ReadableByteChannel)this.channel);
            this.commandProducer.write((WritableByteChannel)this.channel, (byte)2);
            this.journalClientStateProducer.write((WritableByteChannel)this.channel, (Object)new IndexedJournal(1, (Journal)quoteClientWriter));
            this.agent.process((ByteChannel)this.channel);
            this.charSequenceResponseConsumer.read((ReadableByteChannel)this.channel);
            TestUtils.assertEquals((CharSequence)"Journal index does not match key request", (CharSequence)this.charSequenceResponseConsumer.getValue());
            this.commandProducer.write((WritableByteChannel)this.channel, (byte)2);
            this.journalClientStateProducer.write((WritableByteChannel)this.channel, (Object)new IndexedJournal(0, (Journal)quoteClientWriter));
            this.agent.process((ByteChannel)this.channel);
            this.charSequenceResponseConsumer.read((ReadableByteChannel)this.channel);
            TestUtils.assertEquals((CharSequence)"OK", (CharSequence)this.charSequenceResponseConsumer.getValue());
        }
    }

    @Test
    public void testSetKeyRequestResponse() throws Exception {
        this.commandProducer.write((WritableByteChannel)this.channel, (byte)1);
        this.setKeyRequestProducer.write((WritableByteChannel)this.channel, (Object)new IndexedJournalKey(0, this.quoteWriter.getMetadata().getKey()));
        this.agent.process((ByteChannel)this.channel);
        this.charSequenceResponseConsumer.read((ReadableByteChannel)this.channel);
        TestUtils.assertEquals((CharSequence)"OK", (CharSequence)this.charSequenceResponseConsumer.getValue());
        this.hugeBufferConsumer.read((ReadableByteChannel)this.channel);
        this.commandProducer.write((WritableByteChannel)this.channel, (byte)1);
        this.setKeyRequestProducer.write((WritableByteChannel)this.channel, (Object)new IndexedJournalKey(0, this.tradeWriter.getMetadata().getKey()));
        this.agent.process((ByteChannel)this.channel);
        this.charSequenceResponseConsumer.read((ReadableByteChannel)this.channel);
        Assert.assertTrue((boolean)Chars.startsWith((CharSequence)((CharSequence)this.charSequenceResponseConsumer.getValue()), (CharSequence)"Requested key not exported"));
    }
}

