/*
 * Decompiled with CFR 0.152.
 */
package com.questdb.net.ha;

import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.model.Quote;
import com.questdb.model.TestEntity;
import com.questdb.net.ha.JournalClient;
import com.questdb.net.ha.JournalServer;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.std.ex.JournalException;
import com.questdb.std.time.DateFormatUtils;
import com.questdb.store.Journal;
import com.questdb.store.JournalKey;
import com.questdb.store.JournalListener;
import com.questdb.store.JournalWriter;
import com.questdb.store.factory.ReaderFactory;
import com.questdb.store.factory.WriterFactory;
import com.questdb.store.factory.configuration.JournalConfigurationBuilder;
import com.questdb.store.factory.configuration.MetadataBuilder;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class IntegrationTest
extends AbstractTest {
    private static final Log LOG = LogFactory.getLog(IntegrationTest.class);
    private JournalClient client;
    private JournalServer server;

    @Before
    public void setUp() {
        this.server = new JournalServer(new ServerConfig(){
            {
                this.setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(100L));
                this.setEnableMultiCast(false);
            }
        }, (ReaderFactory)this.getFactory());
        this.client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory());
    }

    @Test
    @Ignore
    public void testBadJournalDoesNotResubscribe() {
        Assert.fail();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBadSubscriptionOnTheFlyFollowedByReconnect() throws Exception {
        try (JournalWriter origin = this.getFactory().writer(Quote.class, "origin");){
            int batchSize = 1000;
            int batchCount = 100;
            this.server.publish(origin);
            this.server.start();
            try {
                CountDownLatch terminated = new CountDownLatch(1);
                JournalClient client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory(), null, evt -> {
                    if (evt == 256) {
                        terminated.countDown();
                    }
                });
                client.start();
                final AtomicInteger commits = new AtomicInteger();
                final AtomicInteger errors = new AtomicInteger();
                final CountDownLatch localSubscribed = new CountDownLatch(1);
                final CountDownLatch dataReceived = new CountDownLatch(1);
                try {
                    this.getFactory().writer(Quote.class, "local").close();
                    try (final Journal local = this.getFactory().reader("local");){
                        client.subscribe(Quote.class, "origin", "local", new JournalListener(){

                            public void onCommit() {
                                commits.incrementAndGet();
                                try {
                                    local.refresh();
                                    if (local.size() == 100000L) {
                                        dataReceived.countDown();
                                    }
                                }
                                catch (JournalException e) {
                                    e.printStackTrace();
                                    errors.incrementAndGet();
                                }
                            }

                            public void onEvent(int event) {
                                switch (event) {
                                    case 6: {
                                        localSubscribed.countDown();
                                        break;
                                    }
                                    default: {
                                        errors.incrementAndGet();
                                    }
                                }
                            }
                        });
                        CountDownLatch published = new CountDownLatch(1);
                        CyclicBarrier barrier = new CyclicBarrier(2);
                        AtomicInteger publisherErrors = new AtomicInteger();
                        new Thread(() -> {
                            try {
                                barrier.await();
                                long timestamp = DateFormatUtils.parseDateTime((CharSequence)"2013-09-04T10:00:00.000Z");
                                long increment = 1000L;
                                for (int i = 0; i < 100; ++i) {
                                    TestUtils.generateQuoteData((JournalWriter<Quote>)origin, 1000, timestamp, increment);
                                    timestamp += increment * 1000L;
                                    origin.commit();
                                }
                            }
                            catch (Throwable e) {
                                e.printStackTrace();
                                publisherErrors.incrementAndGet();
                            }
                            published.countDown();
                        }).start();
                        Assert.assertTrue((boolean)localSubscribed.await(10L, TimeUnit.SECONDS));
                        barrier.await();
                        Assert.assertTrue((boolean)published.await(60L, TimeUnit.SECONDS));
                        Assert.assertTrue((boolean)dataReceived.await(60L, TimeUnit.SECONDS));
                        Assert.assertEquals((long)0L, (long)publisherErrors.get());
                        Assert.assertEquals((long)0L, (long)errors.get());
                        Assert.assertTrue((commits.get() > 0 ? 1 : 0) != 0);
                        local.refresh();
                        Assert.assertEquals((long)100000L, (long)local.size());
                    }
                }
                catch (Throwable e) {
                    e.printStackTrace();
                    Assert.fail();
                }
                finally {
                    client.halt();
                }
                Assert.assertTrue((boolean)terminated.await(5L, TimeUnit.SECONDS));
            }
            finally {
                this.server.halt();
            }
        }
    }

    @Test
    public void testClientConnect() throws Exception {
        CountDownLatch error = new CountDownLatch(1);
        this.client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory(), null, evt -> {
            if (evt == 1) {
                error.countDown();
            }
        });
        this.client.start();
        Assert.assertTrue((boolean)error.await(30L, TimeUnit.SECONDS));
    }

    @Test
    public void testClientConnectServerHalt() throws Exception {
        this.server.start();
        this.client.start();
        Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
        this.server.halt();
        Assert.assertEquals((long)0L, (long)this.server.getConnectedClients());
        Assert.assertFalse((boolean)this.server.isRunning());
        Thread.sleep(700L);
        Assert.assertFalse((boolean)this.client.isRunning());
        this.client.halt();
    }

    @Test
    public void testClientDisconnect() throws Exception {
        this.server.start();
        this.client.start();
        Thread.sleep(100L);
        this.client.halt();
        Assert.assertFalse((boolean)this.client.isRunning());
        Thread.sleep(100L);
        Assert.assertEquals((long)0L, (long)this.server.getConnectedClients());
        this.server.halt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOutOfSyncClient() throws Exception {
        int size = 10000;
        try (JournalWriter remote = this.getFactory().writer(Quote.class, "remote", 2 * size);){
            this.server.publish(remote);
            this.server.start();
            try {
                final CountDownLatch commitLatch1 = new CountDownLatch(1);
                this.client.subscribe(Quote.class, "remote", "local", 2 * size, new JournalListener(){

                    public void onCommit() {
                        commitLatch1.countDown();
                    }

                    public void onEvent(int event) {
                    }
                });
                this.client.start();
                TestUtils.generateQuoteData((JournalWriter<Quote>)remote, size);
                Assert.assertTrue((boolean)commitLatch1.await(5L, TimeUnit.SECONDS));
                this.client.halt();
                try (Journal local = this.getFactory().reader(Quote.class, "local");){
                    TestUtils.assertDataEquals(remote, local);
                }
                TestUtils.generateQuoteData((JournalWriter<Quote>)remote, 10000, remote.getMaxTimestamp());
                remote.commit();
                var6_8 = null;
                try (JournalWriter localW = this.getFactory().writer(Quote.class, "local");){
                    TestUtils.generateQuoteData((JournalWriter<Quote>)localW, 10000, localW.getMaxTimestamp());
                    localW.commit();
                    TestUtils.generateQuoteData((JournalWriter<Quote>)localW, 10000, localW.getMaxTimestamp());
                    localW.commit();
                }
                catch (Throwable throwable) {
                    var6_8 = throwable;
                    throw throwable;
                }
                final CountDownLatch errorCountDown = new CountDownLatch(1);
                this.client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory());
                this.client.subscribe(Quote.class, "remote", "local", 2 * size, new JournalListener(){

                    public void onCommit() {
                    }

                    public void onEvent(int event) {
                        errorCountDown.countDown();
                    }
                });
                this.client.start();
                Assert.assertTrue((boolean)errorCountDown.await(5L, TimeUnit.SECONDS));
                this.client.halt();
            }
            finally {
                this.server.halt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore
    public void testOutOfSyncServerSide() throws Exception {
        int size = 10000;
        try (JournalWriter remote = this.getFactory().writer(Quote.class, "remote", 2 * size);){
            this.server.publish(remote);
            this.server.start();
            try {
                AtomicInteger serverErrors = new AtomicInteger();
                final AtomicInteger commits = new AtomicInteger();
                this.client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory(), null, evt -> {
                    if (evt == 258) {
                        serverErrors.incrementAndGet();
                    }
                });
                this.client.subscribe(Quote.class, "remote", "local", 2 * size, new JournalListener(){

                    public void onCommit() {
                        commits.incrementAndGet();
                    }

                    public void onEvent(int event) {
                    }
                });
                this.client.start();
                TestUtils.generateQuoteData((JournalWriter<Quote>)remote, size);
                TestUtils.assertCounter(commits, 1, 1L, TimeUnit.SECONDS);
                this.client.halt();
                try (Journal local = this.getFactory().reader(Quote.class, "local");){
                    TestUtils.assertDataEquals(remote, local);
                }
                TestUtils.generateQuoteData((JournalWriter<Quote>)remote, 10000, remote.getMaxTimestamp());
                remote.commit();
                TestUtils.generateQuoteData((JournalWriter<Quote>)remote, 10000, remote.getMaxTimestamp());
                remote.commit();
                TestUtils.generateQuoteData((JournalWriter<Quote>)remote, 10000, remote.getMaxTimestamp());
                remote.commit();
                var7_9 = null;
                try (JournalWriter localW = this.getFactory().writer(Quote.class, "local");){
                    TestUtils.generateQuoteData((JournalWriter<Quote>)localW, 10000, localW.getMaxTimestamp());
                    localW.commit();
                    TestUtils.generateQuoteData((JournalWriter<Quote>)localW, 10000, localW.getMaxTimestamp());
                    localW.commit();
                }
                catch (Throwable throwable) {
                    var7_9 = throwable;
                    throw throwable;
                }
                final AtomicInteger errorCounter = new AtomicInteger();
                this.client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory(), null, evt -> {
                    if (evt == 258) {
                        serverErrors.incrementAndGet();
                    }
                });
                this.client.subscribe(Quote.class, "remote", "local", 2 * size, new JournalListener(){

                    public void onCommit() {
                        commits.incrementAndGet();
                    }

                    public void onEvent(int event) {
                        errorCounter.incrementAndGet();
                        System.out.println("EV: " + event);
                    }
                });
                this.client.start();
                TestUtils.assertCounter(commits, 1, 1L, TimeUnit.SECONDS);
                TestUtils.assertCounter(errorCounter, 1, 1L, TimeUnit.SECONDS);
                this.client.halt();
                Assert.assertEquals((long)0L, (long)serverErrors.get());
            }
            finally {
                this.server.halt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testResubscribeAfterBadSubscription() throws Exception {
        int size = 1000;
        try (JournalWriter origin = this.getFactory().writer(Quote.class, "origin");){
            TestUtils.generateQuoteData((JournalWriter<Quote>)origin, size);
            this.server.publish(origin);
            this.server.start();
            try {
                this.getFactory().writer((MetadataBuilder)new JournalConfigurationBuilder().$("local").$int("x").$()).close();
                CountDownLatch terminated = new CountDownLatch(1);
                AtomicInteger serverDied = new AtomicInteger();
                JournalClient client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory(), null, evt -> {
                    switch (evt) {
                        case 256: {
                            terminated.countDown();
                            break;
                        }
                        case 258: {
                            serverDied.incrementAndGet();
                            break;
                        }
                    }
                });
                client.start();
                try {
                    final CountDownLatch incompatible = new CountDownLatch(1);
                    client.subscribe(Quote.class, "origin", "local", new JournalListener(){

                        public void onCommit() {
                        }

                        public void onEvent(int event) {
                            if (event == 2) {
                                incompatible.countDown();
                            }
                        }
                    });
                    Assert.assertTrue((boolean)incompatible.await(500L, TimeUnit.SECONDS));
                    this.getFactory().delete("local");
                    final AtomicInteger errorCount = new AtomicInteger();
                    final CountDownLatch commit = new CountDownLatch(1);
                    client.subscribe(Quote.class, "origin", "local", new JournalListener(){

                        public void onCommit() {
                            commit.countDown();
                        }

                        public void onEvent(int event) {
                            if (event != 6) {
                                errorCount.incrementAndGet();
                            }
                        }
                    });
                    Assert.assertTrue((boolean)commit.await(30L, TimeUnit.SECONDS));
                    Assert.assertEquals((long)0L, (long)errorCount.get());
                }
                finally {
                    client.halt();
                }
                Assert.assertTrue((boolean)terminated.await(5L, TimeUnit.SECONDS));
                Assert.assertEquals((long)0L, (long)serverDied.get());
                try (Journal r = this.getFactory().reader("local");){
                    Assert.assertEquals((long)size, (long)r.size());
                }
            }
            finally {
                this.server.halt();
            }
        }
    }

    @Test
    @Ignore
    public void testResubscribeAfterUnsubscribe() {
        Assert.fail();
    }

    @Test
    public void testServerIdleStartStop() throws Exception {
        try (JournalWriter remote = this.getFactory().writer(Quote.class, "remote");){
            this.server.publish(remote);
            this.server.start();
            this.client.subscribe(Quote.class, "remote", "local");
            this.client.start();
            Thread.sleep(100L);
            this.server.halt();
            Assert.assertFalse((boolean)this.server.isRunning());
        }
    }

    @Test
    public void testServerStartStop() throws Exception {
        this.server.start();
        this.server.halt();
        Assert.assertFalse((boolean)this.server.isRunning());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSingleJournalSync() throws Exception {
        int size = 100000;
        try (JournalWriter remote = this.getFactory().writer(Quote.class, "remote", 2 * size);){
            this.server.publish(remote);
            this.server.start();
            try {
                final CountDownLatch latch = new CountDownLatch(1);
                this.client.subscribe(Quote.class, "remote", "local", 2 * size, new JournalListener(){

                    public void onCommit() {
                        latch.countDown();
                    }

                    public void onEvent(int event) {
                    }
                });
                this.client.start();
                TestUtils.generateQuoteData((JournalWriter<Quote>)remote, size);
                latch.await();
                this.client.halt();
            }
            finally {
                this.server.halt();
            }
            try (Journal local = this.getFactory().reader(Quote.class, "local");){
                TestUtils.assertDataEquals(remote, local);
            }
        }
    }

    @Test
    @Ignore
    public void testSubscribeCopyOnTheFly() {
        Assert.fail();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubscribeIncompatible() throws Exception {
        int size = 10000;
        try (JournalWriter origin = this.getFactory().writer(Quote.class, "origin");){
            TestUtils.generateQuoteData((JournalWriter<Quote>)origin, size);
            try (JournalWriter remote = this.getFactory().writer(Quote.class, "remote");){
                this.server.publish(remote);
                this.server.start();
                try {
                    remote.append(origin.query().all().asResultSet().subset(0, 1000));
                    remote.commit();
                    this.getFactory().writer((MetadataBuilder)new JournalConfigurationBuilder().$("local").$int("x").$()).close();
                    CountDownLatch terminated = new CountDownLatch(1);
                    JournalClient client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory(), null, evt -> {
                        if (evt == 256) {
                            terminated.countDown();
                        }
                    });
                    client.start();
                    final CountDownLatch incompatible = new CountDownLatch(1);
                    try {
                        client.subscribe(Quote.class, "remote", "local", new JournalListener(){

                            public void onCommit() {
                            }

                            public void onEvent(int event) {
                                if (event == 2) {
                                    incompatible.countDown();
                                }
                            }
                        });
                        Assert.assertTrue((boolean)incompatible.await(500L, TimeUnit.SECONDS));
                        remote.append(origin.query().all().asResultSet().subset(1000, 2000));
                        remote.commit();
                    }
                    finally {
                        client.halt();
                    }
                    Assert.assertTrue((boolean)terminated.await(5L, TimeUnit.SECONDS));
                }
                finally {
                    this.server.halt();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubscribeIncompatibleWriter() throws Exception {
        int size = 10000;
        try (JournalWriter origin = this.getFactory().writer(Quote.class, "origin");){
            TestUtils.generateQuoteData((JournalWriter<Quote>)origin, size);
            try (JournalWriter remote = this.getFactory().writer(Quote.class, "remote");){
                this.server.publish(remote);
                this.server.start();
                try {
                    remote.append(origin.query().all().asResultSet().subset(0, 1000));
                    remote.commit();
                    try (JournalWriter writer = this.getFactory().writer((MetadataBuilder)new JournalConfigurationBuilder().$("local").$int("x").$());){
                        CountDownLatch terminated = new CountDownLatch(1);
                        AtomicInteger serverErrors = new AtomicInteger();
                        JournalClient client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory(), null, evt -> {
                            if (evt == 256) {
                                terminated.countDown();
                            }
                            if (evt == 258) {
                                serverErrors.incrementAndGet();
                            }
                        });
                        client.start();
                        final CountDownLatch incompatible = new CountDownLatch(1);
                        try {
                            client.subscribe(new JournalKey("remote"), writer, new JournalListener(){

                                public void onCommit() {
                                }

                                public void onEvent(int event) {
                                    if (event == 2) {
                                        incompatible.countDown();
                                    }
                                }
                            });
                            Assert.assertTrue((boolean)incompatible.await(500L, TimeUnit.SECONDS));
                            remote.append(origin.query().all().asResultSet().subset(1000, 2000));
                            remote.commit();
                        }
                        finally {
                            client.halt();
                        }
                        Assert.assertTrue((boolean)terminated.await(5L, TimeUnit.SECONDS));
                        Assert.assertEquals((long)0L, (long)serverErrors.get());
                    }
                }
                finally {
                    this.server.halt();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubscribeOnTheFly() throws Exception {
        int size = 5000;
        try (JournalWriter origin = this.getFactory().writer(Quote.class, "origin");){
            TestUtils.generateQuoteData((JournalWriter<Quote>)origin, size);
            try (JournalWriter remote1 = this.getFactory().writer(Quote.class, "remote1");
                 JournalWriter remote2 = this.getFactory().writer(Quote.class, "remote2");){
                this.server.publish(remote1);
                this.server.publish(remote2);
                this.server.start();
                try {
                    remote1.append(origin.query().all().asResultSet().subset(0, 1000));
                    remote1.commit();
                    remote2.append(origin.query().all().asResultSet().subset(0, 1000));
                    remote2.commit();
                    final AtomicInteger counter = new AtomicInteger();
                    JournalClient client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory());
                    client.start();
                    try {
                        client.subscribe(Quote.class, "remote1", "local1", new JournalListener(){

                            public void onCommit() {
                                counter.incrementAndGet();
                            }

                            public void onEvent(int event) {
                            }
                        });
                        TestUtils.assertCounter(counter, 1, 2L, TimeUnit.SECONDS);
                        try (Journal r = this.getFactory().reader("local1");){
                            Assert.assertEquals((long)1000L, (long)r.size());
                        }
                        client.subscribe(Quote.class, "remote2", "local2", new JournalListener(){

                            public void onCommit() {
                                counter.incrementAndGet();
                            }

                            public void onEvent(int event) {
                            }
                        });
                        TestUtils.assertCounter(counter, 2, 2L, TimeUnit.SECONDS);
                        r = this.getFactory().reader("local2");
                        var11_17 = null;
                        try {
                            Assert.assertEquals((long)1000L, (long)r.size());
                        }
                        catch (Throwable throwable) {
                            var11_17 = throwable;
                            throw throwable;
                        }
                        finally {
                            if (r != null) {
                                if (var11_17 != null) {
                                    try {
                                        r.close();
                                    }
                                    catch (Throwable throwable) {
                                        var11_17.addSuppressed(throwable);
                                    }
                                } else {
                                    r.close();
                                }
                            }
                        }
                    }
                    finally {
                        client.halt();
                    }
                }
                finally {
                    this.server.halt();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubscribeTwice() throws Exception {
        int size = 10000;
        try (JournalWriter origin = this.getFactory().writer(Quote.class, "origin");){
            TestUtils.generateQuoteData((JournalWriter<Quote>)origin, size);
            try (JournalWriter remote1 = this.getFactory().writer(Quote.class, "remote1");
                 JournalWriter remote2 = this.getFactory().writer(Quote.class, "remote2");){
                this.server.publish(remote1);
                this.server.publish(remote2);
                this.server.start();
                try {
                    remote1.append(origin.query().all().asResultSet().subset(0, 1000));
                    remote1.commit();
                    remote2.append(origin.query().all().asResultSet().subset(0, 1000));
                    remote2.commit();
                    final AtomicInteger counter = new AtomicInteger();
                    final AtomicInteger errors = new AtomicInteger();
                    JournalClient client = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory());
                    client.start();
                    try {
                        client.subscribe(Quote.class, "remote1", "local1", new JournalListener(){

                            public void onCommit() {
                                counter.incrementAndGet();
                            }

                            public void onEvent(int event) {
                                errors.incrementAndGet();
                            }
                        });
                        TestUtils.assertCounter(counter, 1, 2L, TimeUnit.SECONDS);
                        try (Journal r = this.getFactory().reader("local1");){
                            Assert.assertEquals((long)1000L, (long)r.size());
                        }
                        client.subscribe(Quote.class, "remote2", "local1", new JournalListener(){

                            public void onCommit() {
                                counter.incrementAndGet();
                            }

                            public void onEvent(int event) {
                                errors.incrementAndGet();
                            }
                        });
                        TestUtils.assertCounter(counter, 1, 2L, TimeUnit.SECONDS);
                        TestUtils.assertCounter(errors, 1, 2L, TimeUnit.SECONDS);
                        r = this.getFactory().reader("local1");
                        var12_18 = null;
                        try {
                            Assert.assertEquals((long)1000L, (long)r.size());
                        }
                        catch (Throwable throwable) {
                            var12_18 = throwable;
                            throw throwable;
                        }
                        finally {
                            if (r != null) {
                                if (var12_18 != null) {
                                    try {
                                        r.close();
                                    }
                                    catch (Throwable throwable) {
                                        var12_18.addSuppressed(throwable);
                                    }
                                } else {
                                    r.close();
                                }
                            }
                        }
                    }
                    finally {
                        client.halt();
                    }
                }
                finally {
                    this.server.halt();
                }
            }
        }
    }

    @Test
    public void testTwoClientSync() throws Exception {
        int size = 10000;
        try (JournalWriter origin = this.getFactory().writer(Quote.class, "origin");){
            TestUtils.generateQuoteData((JournalWriter<Quote>)origin, size);
            try (JournalWriter remote = this.getFactory().writer(Quote.class, "remote");){
                remote.append(origin.query().all().asResultSet().subset(0, 1000));
                remote.commit();
                this.server.publish(remote);
                this.server.start();
                final AtomicInteger counter = new AtomicInteger();
                JournalClient client1 = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory());
                client1.subscribe(Quote.class, "remote", "local1", new JournalListener(){

                    public void onCommit() {
                        counter.incrementAndGet();
                    }

                    public void onEvent(int event) {
                    }
                });
                client1.start();
                JournalClient client2 = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory());
                client2.subscribe(Quote.class, "remote", "local2", new JournalListener(){

                    public void onCommit() {
                        counter.incrementAndGet();
                    }

                    public void onEvent(int event) {
                    }
                });
                client2.start();
                TestUtils.assertCounter(counter, 2, 2L, TimeUnit.SECONDS);
                client1.halt();
                remote.append(origin.query().all().asResultSet().subset(1000, 1500));
                remote.commit();
                TestUtils.assertCounter(counter, 3, 2L, TimeUnit.SECONDS);
                LOG.info().$((CharSequence)"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~").$();
                final CountDownLatch waitForUpdate = new CountDownLatch(1);
                client1 = new JournalClient(new ClientConfig("localhost"), (WriterFactory)this.getFactory());
                client1.subscribe(Quote.class, "remote", "local1", new JournalListener(){

                    public void onCommit() {
                        counter.incrementAndGet();
                        waitForUpdate.countDown();
                    }

                    public void onEvent(int event) {
                    }
                });
                client1.start();
                waitForUpdate.await(2L, TimeUnit.SECONDS);
                remote.append(origin.query().all().asResultSet().subset(1500, size));
                remote.commit();
                TestUtils.assertCounter(counter, 6, 2L, TimeUnit.SECONDS);
                try (Journal local1r = this.getFactory().reader(Quote.class, "local1");){
                    Assert.assertEquals((long)size, (long)local1r.size());
                }
                var11_15 = null;
                try (Journal local2r = this.getFactory().reader(Quote.class, "local2");){
                    Assert.assertEquals((long)size, (long)local2r.size());
                }
                catch (Throwable throwable) {
                    var11_15 = throwable;
                    throw throwable;
                }
                client1.halt();
                client2.halt();
                this.server.halt();
            }
        }
    }

    @Test
    public void testTwoJournalsSync() throws Exception {
        int size = 10000;
        try (JournalWriter remote1 = this.getFactory().writer(Quote.class, "remote1", 2 * size);
             JournalWriter remote2 = this.getFactory().writer(TestEntity.class, "remote2", 2 * size);){
            this.server.publish(remote1);
            this.server.publish(remote2);
            this.server.start();
            final CountDownLatch latch = new CountDownLatch(2);
            this.client.subscribe(Quote.class, "remote1", "local1", 2 * size, new JournalListener(){

                public void onCommit() {
                    latch.countDown();
                }

                public void onEvent(int event) {
                }
            });
            this.client.subscribe(TestEntity.class, "remote2", "local2", 2 * size, new JournalListener(){

                public void onCommit() {
                    latch.countDown();
                }

                public void onEvent(int event) {
                }
            });
            this.client.start();
            TestUtils.generateQuoteData((JournalWriter<Quote>)remote1, size);
            TestUtils.generateTestEntityData((JournalWriter<TestEntity>)remote2, size);
            latch.await();
            this.client.halt();
            this.server.halt();
            try (Journal local1 = this.getFactory().reader(Quote.class, "local1");){
                Assert.assertEquals((String)"Local1 has wrong size", (long)size, (long)local1.size());
            }
            var8_12 = null;
            try (Journal local2 = this.getFactory().reader(TestEntity.class, "local2");){
                Assert.assertEquals((String)"Remote2 has wrong size", (long)size, (long)remote2.size());
                Assert.assertEquals((String)"Local2 has wrong size", (long)size, (long)local2.size());
            }
            catch (Throwable throwable) {
                var8_12 = throwable;
                throw throwable;
            }
        }
    }

    @Test
    @Ignore
    public void testUnsubscribe() {
        Assert.fail();
    }

    @Test
    @Ignore
    public void testUnsubscribeOnTheFly() {
        Assert.fail();
    }

    @Test
    @Ignore
    public void testUnsubscribeReconnectBehaviour() {
        Assert.fail();
    }

    @Test
    public void testWriterShutdown() throws Exception {
        int size = 10000;
        try (JournalWriter remote = this.getFactory().writer(Quote.class, "remote", 2 * size);){
            this.server.publish(remote);
            this.server.start();
            this.client.subscribe(Quote.class, "remote", "local", 2 * size);
            this.client.start();
            TestUtils.generateQuoteData((JournalWriter<Quote>)remote, size, 0L);
        }
        this.client.halt();
        this.server.halt();
    }
}

