/*
 * Decompiled with CFR 0.152.
 */
package com.questdb.net.ha;

import com.questdb.model.Quote;
import com.questdb.net.ha.JournalClient;
import com.questdb.net.ha.JournalServer;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.std.Rnd;
import com.questdb.std.ex.JournalException;
import com.questdb.store.Journal;
import com.questdb.store.JournalKey;
import com.questdb.store.JournalListener;
import com.questdb.store.JournalWriter;
import com.questdb.store.factory.ReaderFactory;
import com.questdb.store.factory.WriterFactory;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestData;
import com.questdb.test.tools.TestUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class ScenarioTest
extends AbstractTest {
    private final ServerConfig serverConfig = new ServerConfig(){
        {
            this.setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(300L));
            this.setEnableMultiCast(false);
        }
    };
    private final ClientConfig clientConfig = new ClientConfig("localhost");

    @Test
    public void testLagTrickle() throws Exception {
        try (JournalWriter origin = this.getFactory().writer(Quote.class, "origin");){
            TestData.appendQuoteData2((JournalWriter<Quote>)origin);
            try (JournalWriter randomOrigin = this.getFactory().writer(new JournalKey(Quote.class, "origin-rnd", 3, 0, false));){
                randomOrigin.append(origin.query().all().asResultSet().shuffle(new Rnd()));
                try (JournalWriter remote = this.getFactory().writer(Quote.class, "remote");
                     Journal remoteReader = this.getFactory().reader(Quote.class, "remote");){
                    this.getFactory().writer(Quote.class, "local").close();
                    try (final Journal local = this.getFactory().reader(Quote.class, "local");){
                        Assert.assertEquals((long)0L, (long)local.size());
                        JournalServer server = new JournalServer(this.serverConfig, (ReaderFactory)this.getFactory());
                        JournalClient client = new JournalClient(this.clientConfig, (WriterFactory)this.getFactory());
                        server.publish(remote);
                        server.start();
                        final AtomicInteger errors = new AtomicInteger();
                        final CountDownLatch ready = new CountDownLatch(1);
                        client.subscribe(Quote.class, "remote", "local", new JournalListener(){

                            public void onCommit() {
                                try {
                                    if (local.refresh() && local.size() == 33L) {
                                        ready.countDown();
                                    }
                                }
                                catch (JournalException e) {
                                    errors.incrementAndGet();
                                    e.printStackTrace();
                                }
                            }

                            public void onEvent(int event) {
                                if (event != 6) {
                                    errors.incrementAndGet();
                                }
                            }
                        });
                        client.start();
                        for (int n = 0; n < 400; n += 10) {
                            this.lagIteration((Journal<Quote>)randomOrigin, (JournalWriter<Quote>)remote, n, n + 10);
                        }
                        Assert.assertTrue((boolean)ready.await(10L, TimeUnit.SECONDS));
                        server.halt();
                        client.halt();
                        local.refresh();
                        remoteReader.refresh();
                        TestUtils.assertEquals(remoteReader, local);
                        Assert.assertEquals((long)0L, (long)errors.get());
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSingleJournalTrickle() throws Exception {
        JournalServer server = new JournalServer(this.serverConfig, (ReaderFactory)this.getFactory());
        JournalClient client = new JournalClient(this.clientConfig, (WriterFactory)this.getFactory());
        try (JournalWriter origin = this.getFactory().writer(Quote.class, "origin");){
            TestData.appendQuoteData1((JournalWriter<Quote>)origin);
            Assert.assertEquals((long)100L, (long)origin.size());
            try (JournalWriter remote = this.getFactory().writer(Quote.class, "remote");){
                this.getFactory().writer(Quote.class, "local").close();
                try (Journal local = this.getFactory().reader(Quote.class, "local");){
                    Assert.assertEquals((long)0L, (long)local.size());
                    server.publish(remote);
                    server.start();
                    client.subscribe(Quote.class, "remote", "local");
                    client.start();
                    try {
                        ScenarioTest.iteration("2013-02-10T10:03:20.000Z\tALDW\t0.32885755937534\t0.5741201360255567\t1836077773\t693649102\tFast trading\tSK\n2013-02-10T10:06:40.000Z\tAMD\t0.16781047061245025\t0.4831627617900026\t1423050407\t141794980\tFast trading\tGR\n2013-02-10T10:07:30.000Z\tHSBA.L\t0.04724340267969518\t0.5988337212476811\t178180342\t1522085049\tFast trading\tSK\n", (Journal<Quote>)origin, (JournalWriter<Quote>)remote, (Journal<Quote>)local, 0, 10);
                        ScenarioTest.iteration("2013-02-10T10:15:50.000Z\tALDW\t0.7976166367363274\t0.06448758069572669\t1436005581\t1897226585\tFast trading\tGR\n2013-02-10T10:15:00.000Z\tAMD\t0.6789043827286667\t0.771921575501964\t580589771\t1159590077\tFast trading\tLXE\n2013-02-10T10:14:10.000Z\tHSBA.L\t0.984512894941384\t0.2664006899723862\t1288300070\t838312365\tFast trading\tLXE\n", (Journal<Quote>)origin, (JournalWriter<Quote>)remote, (Journal<Quote>)local, 10, 20);
                        ScenarioTest.iteration("2013-02-10T10:24:10.000Z\tALDW\t0.26008876203627374\t0.04354393444455451\t25334630\t1835685418\tFast trading\tGR\n2013-02-10T10:23:20.000Z\tAMD\t0.9757637204046299\t0.7654386171943978\t23937995\t992860510\tFast trading\tLXE\n2013-02-10T10:21:40.000Z\tHSBA.L\t0.5630111081489209\t0.4222995146933318\t1534594684\t1153925552\tFast trading\tLN\n", (Journal<Quote>)origin, (JournalWriter<Quote>)remote, (Journal<Quote>)local, 20, 30);
                    }
                    finally {
                        client.halt();
                        server.halt();
                    }
                }
            }
        }
    }

    private static void iteration(String expected, Journal<Quote> origin, JournalWriter<Quote> remote, Journal<Quote> local, int lo, int hi) throws Exception {
        remote.append(origin.query().all().asResultSet().subset(lo, hi));
        remote.commit();
        int count = 0;
        do {
            Thread.sleep(100L);
            if (count++ <= 10) continue;
            Assert.fail((String)"Refresh is too slow!");
        } while (!local.refresh());
        TestUtils.assertEquals(expected, local.query().head().withKeys(new String[0]).asResultSet());
    }

    private void lagIteration(final Journal<Quote> origin, JournalWriter<Quote> remote, final int lo, final int hi) throws JournalException {
        remote.mergeAppend((List)new ArrayList<Quote>(){
            {
                for (Quote q : origin.query().all().asResultSet().subset(lo, hi).sort(new String[]{"timestamp"})) {
                    this.add(q);
                }
            }
        });
        remote.commit();
    }
}

