package com.redis.spring.batch;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.redis.lettucemod.Beers;
import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.lettucemod.timeseries.AddOptions;
import com.redis.lettucemod.timeseries.CreateOptions;
import com.redis.lettucemod.timeseries.DuplicatePolicy;
import com.redis.lettucemod.timeseries.RangeOptions;
import com.redis.lettucemod.timeseries.Sample;
import com.redis.lettucemod.timeseries.TimeRange;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.IntRange;
import com.redis.spring.batch.common.KeyDump;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.convert.GeoValueConverter;
import com.redis.spring.batch.convert.IdentityConverter;
import com.redis.spring.batch.convert.ScoredValueConverter;
import com.redis.spring.batch.reader.GeneratorReaderOptions;
import com.redis.spring.batch.reader.KeyComparison;
import com.redis.spring.batch.reader.KeyEventType;
import com.redis.spring.batch.reader.KeyspaceNotification;
import com.redis.spring.batch.reader.KeyspaceNotificationItemReader;
import com.redis.spring.batch.reader.LiveRedisItemReader;
import com.redis.spring.batch.reader.QueueOptions;
import com.redis.spring.batch.reader.ScanKeyItemReader;
import com.redis.spring.batch.reader.ScanOptions;
import com.redis.spring.batch.reader.ScanSizeEstimator;
import com.redis.spring.batch.reader.ScanSizeEstimatorOptions;
import com.redis.spring.batch.reader.SlotRangeFilter;
import com.redis.spring.batch.reader.StreamItemReader;
import com.redis.spring.batch.reader.StreamReaderOptions;
import com.redis.spring.batch.writer.KeyComparisonCountItemWriter;
import com.redis.spring.batch.writer.WaitForReplication;
import com.redis.spring.batch.writer.WriterOptions;
import com.redis.spring.batch.writer.operation.Geoadd;
import com.redis.spring.batch.writer.operation.Hset;
import com.redis.spring.batch.writer.operation.JsonSet;
import com.redis.spring.batch.writer.operation.TsAdd;
import com.redis.spring.batch.writer.operation.Xadd;
import com.redis.spring.batch.writer.operation.Zadd;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.Consumer;
import io.lettuce.core.GeoArgs;
import io.lettuce.core.KeyScanArgs;
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.Range;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.ScanIterator;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.StringCodec;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.support.IteratorItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;

/* loaded from: input_file:com/redis/spring/batch/AbstractBatchTests.class */
abstract class AbstractBatchTests extends AbstractTestBase {
    private static final QueueOptions NOTIFICATION_QUEUE_OPTIONS = QueueOptions.builder().capacity(100000).build();
    private static final String[] NOTIFICATION_PATTERNS = LiveRedisItemReader.patterns(0, LiveRedisItemReader.defaultKeyPatterns());
    private static final String JSON_BEER_1 = "{\"id\":\"1\",\"brewery_id\":\"812\",\"name\":\"Hocus Pocus\",\"abv\":\"4.5\",\"ibu\":\"0\",\"srm\":\"0\",\"upc\":\"0\",\"filepath\":\"\",\"descript\":\"Our take on a classic summer ale.  A toast to weeds, rays, and summer haze.  A light, crisp ale for mowing lawns, hitting lazy fly balls, and communing with nature, Hocus Pocus is offered up as a summer sacrifice to clodless days.\\n\\nIts malty sweetness finishes tart and crisp and is best apprediated with a wedge of orange.\",\"add_user\":\"0\",\"last_mod\":\"2010-07-22 20:00:20 UTC\",\"style_name\":\"Light American Wheat Ale or Lager\",\"cat_name\":\"Other Style\"}";
    private static final int BEER_COUNT = 1019;
    private static final int STREAM_MESSAGE_COUNT = 57;
    private static final String DEFAULT_CONSUMER_GROUP = "consumerGroup";

    /* loaded from: input_file:com/redis/spring/batch/AbstractBatchTests$Geo.class */
    private class Geo {
        private String member;
        private double longitude;
        private double latitude;

        public Geo(String str, double d, double d2) {
            this.member = str;
            this.longitude = d;
            this.latitude = d2;
        }

        public String getMember() {
            return this.member;
        }

        public double getLongitude() {
            return this.longitude;
        }

        public double getLatitude() {
            return this.latitude;
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/AbstractBatchTests$ZValue.class */
    private class ZValue {
        private String member;
        private double score;

        public ZValue(String str, double d) {
            this.member = str;
            this.score = d;
        }

        public String getMember() {
            return this.member;
        }

        public double getScore() {
            return this.score;
        }
    }

    private void enableKeyspaceNotifications(AbstractRedisClient abstractRedisClient) {
        RedisModulesUtils.connection(abstractRedisClient).sync().configSet("notify-keyspace-events", "AK");
    }

    @Test
    void writeWait(TestInfo testInfo) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            HashMap hashMap = new HashMap();
            hashMap.put("id", String.valueOf(i));
            hashMap.put("field1", "value1");
            hashMap.put("field2", "value2");
            arrayList.add(hashMap);
        }
        Assertions.assertEquals("Insufficient replication level - expected: 1, actual: 0", ((Throwable) run(job(testInfo, step(testInfo, new ListItemReader(arrayList), null, sourceWriter().options(WriterOptions.builder().waitForReplication(WaitForReplication.of(1, Duration.ofMillis(300L))).build()).operation(Hset.key(map -> {
            return "hash:" + ((String) map.remove("id"));
        }).map(IdentityConverter.instance()).build())))).getAllFailureExceptions().get(0)).getMessage());
    }

    @Test
    void writeHash(TestInfo testInfo) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            HashMap hashMap = new HashMap();
            hashMap.put("id", String.valueOf(i));
            hashMap.put("field1", "value1");
            hashMap.put("field2", "value2");
            arrayList.add(hashMap);
        }
        run(testInfo, new ListItemReader(arrayList), sourceWriter().operation(Hset.key(map -> {
            return "hash:" + ((String) map.remove("id"));
        }).map(IdentityConverter.instance()).build()));
        Assertions.assertEquals(arrayList.size(), this.sourceConnection.sync().keys("hash:*").size());
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assertions.assertEquals(arrayList.get(i2), this.sourceConnection.sync().hgetall("hash:" + i2));
        }
    }

    @Test
    void writeGeo(TestInfo testInfo) throws Exception {
        run(testInfo, new ListItemReader(Arrays.asList(new Geo("Venice Breakwater", -118.476056d, 33.985728d), new Geo("Long Beach National", -73.667022d, 40.582739d))), sourceWriter().operation(Geoadd.key("geoset").value(new GeoValueConverter((v0) -> {
            return v0.getMember();
        }, (v0) -> {
            return v0.getLongitude();
        }, (v0) -> {
            return v0.getLatitude();
        })).build()));
        Set georadius = this.sourceConnection.sync().georadius("geoset", -118.0d, 34.0d, 100.0d, GeoArgs.Unit.mi);
        Assertions.assertEquals(1, georadius.size());
        Assertions.assertTrue(georadius.contains("Venice Breakwater"));
    }

    @Test
    void writeHashDel(TestInfo testInfo) throws Exception {
        ArrayList arrayList = new ArrayList();
        RedisModulesCommands sync = this.sourceConnection.sync();
        int i = 0;
        while (i < 100) {
            String valueOf = String.valueOf(i);
            HashMap hashMap = new HashMap();
            hashMap.put("field1", "value1");
            sync.hset("hash:" + valueOf, hashMap);
            HashMap hashMap2 = new HashMap();
            hashMap2.put("field2", "value2");
            arrayList.add(new AbstractMap.SimpleEntry(valueOf, i < 50 ? null : hashMap2));
            i++;
        }
        run(testInfo, new ListItemReader(arrayList), sourceWriter().operation(Hset.key(entry -> {
            return "hash:" + ((String) entry.getKey());
        }).map((v0) -> {
            return v0.getValue();
        }).build()));
        Assertions.assertEquals(50, sync.keys("hash:*").size());
        Assertions.assertEquals(2, sync.hgetall("hash:50").size());
    }

    @Test
    void writeSortedSet(TestInfo testInfo) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(new ZValue(String.valueOf(i), i % 10));
        }
        run(testInfo, new ListItemReader(arrayList), sourceWriter().operation(Zadd.key("zset").value(new ScoredValueConverter((v0) -> {
            return v0.getMember();
        }, (v0) -> {
            return v0.getScore();
        })).build()));
        RedisModulesCommands sync = this.sourceConnection.sync();
        Assertions.assertEquals(1L, sync.dbsize());
        Assertions.assertEquals(arrayList.size(), sync.zcard("zset"));
        Assertions.assertEquals(60, sync.zrangebyscore("zset", Range.from(Range.Boundary.including(0), Range.Boundary.including(5))).size());
    }

    @Test
    void writeDataStructures(TestInfo testInfo) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            HashMap hashMap = new HashMap();
            hashMap.put("field1", "value1");
            hashMap.put("field2", "value2");
            DataStructure dataStructure = new DataStructure();
            dataStructure.setKey("hash:" + i);
            dataStructure.setType(DataStructure.Type.HASH);
            dataStructure.setValue(hashMap);
            arrayList.add(dataStructure);
        }
        run(testInfo, new ListItemReader(arrayList), sourceWriter().dataStructure());
        Assertions.assertEquals(100L, this.sourceConnection.sync().keys("hash:*").size());
    }

    @Test
    void metrics(TestInfo testInfo) throws Exception {
        List meters = Metrics.globalRegistry.getMeters();
        CompositeMeterRegistry compositeMeterRegistry = Metrics.globalRegistry;
        Objects.requireNonNull(compositeMeterRegistry);
        meters.forEach(compositeMeterRegistry::remove);
        SimpleMeterRegistry simpleMeterRegistry = new SimpleMeterRegistry(new SimpleConfig() { // from class: com.redis.spring.batch.AbstractBatchTests.1
            public String get(String str) {
                return null;
            }

            public Duration step() {
                return Duration.ofMillis(1L);
            }
        }, Clock.SYSTEM);
        Metrics.addRegistry(simpleMeterRegistry);
        generate(testInfo);
        RedisItemReader dataStructure = sourceReader().dataStructure();
        dataStructure.open(new ExecutionContext());
        Assertions.assertNotNull(simpleMeterRegistry.find("spring.batch.redis.reader.queue.size").gauge());
        dataStructure.close();
        simpleMeterRegistry.close();
        List meters2 = Metrics.globalRegistry.getMeters();
        CompositeMeterRegistry compositeMeterRegistry2 = Metrics.globalRegistry;
        Objects.requireNonNull(compositeMeterRegistry2);
        meters2.forEach(compositeMeterRegistry2::remove);
    }

    @Test
    void filterKeySlot(TestInfo testInfo) throws Exception {
        enableKeyspaceNotifications(this.sourceClient);
        LiveRedisItemReader dataStructure = sourceReader().live().flushingOptions(DEFAULT_FLUSHING_OPTIONS).keyFilter(SlotRangeFilter.of(0, 8000)).dataStructure();
        SynchronizedListItemWriter synchronizedListItemWriter = new SynchronizedListItemWriter();
        JobExecution runAsync = runAsync(testInfo, dataStructure, synchronizedListItemWriter);
        generate(testInfo, GeneratorReaderOptions.builder().count(100).build());
        awaitTermination(runAsync);
        Assertions.assertFalse(synchronizedListItemWriter.getWrittenItems().stream().map((v0) -> {
            return v0.getKey();
        }).map(SlotHash::getSlot).anyMatch(num -> {
            return num.intValue() < 0 || num.intValue() > 8000;
        }));
    }

    @Test
    void keyspaceNotificationsReader(TestInfo testInfo) throws Exception {
        enableKeyspaceNotifications(this.sourceClient);
        KeyspaceNotificationItemReader<String, String> keyspaceNotificationItemReader = new KeyspaceNotificationItemReader<>(this.sourceClient, StringCodec.UTF8, NOTIFICATION_QUEUE_OPTIONS, NOTIFICATION_PATTERNS);
        try {
            keyspaceNotificationItemReader.open(new ExecutionContext());
            generate(testInfo, GeneratorReaderOptions.builder().types(new DataStructure.Type[]{DataStructure.Type.HASH, DataStructure.Type.LIST, DataStructure.Type.SET, DataStructure.Type.STREAM, DataStructure.Type.STRING, DataStructure.Type.ZSET, DataStructure.Type.TIMESERIES, DataStructure.Type.JSON}).build());
            awaitUntil(() -> {
                return Boolean.valueOf(keyspaceNotificationItemReader.getQueue().size() == 100);
            });
            Assertions.assertEquals(KeyEventType.SET, ((KeyspaceNotification) keyspaceNotificationItemReader.getQueue().remove()).getEventType());
            assertEventTypes(keyspaceNotificationItemReader, KeyEventType.SET, KeyEventType.HSET, KeyEventType.JSON_SET, KeyEventType.RPUSH, KeyEventType.SADD, KeyEventType.ZADD, KeyEventType.XADD, KeyEventType.TS_ADD);
            keyspaceNotificationItemReader.close();
        } catch (Throwable th) {
            try {
                keyspaceNotificationItemReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void scanKeyItemReader(TestInfo testInfo) throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
        generate(testInfo, GeneratorReaderOptions.builder().count(100).build());
        ScanKeyItemReader scanKeyItemReader = new ScanKeyItemReader(this.sourceClient, StringCodec.UTF8, ScanOptions.builder().build());
        try {
            scanKeyItemReader.open(new ExecutionContext());
            Assertions.assertEquals(100, Utils.readAll(scanKeyItemReader).size());
            scanKeyItemReader.close();
        } catch (Throwable th) {
            try {
                scanKeyItemReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void assertEventTypes(KeyspaceNotificationItemReader<String, String> keyspaceNotificationItemReader, KeyEventType... keyEventTypeArr) {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        while (!keyspaceNotificationItemReader.getQueue().isEmpty()) {
            linkedHashSet.add(((KeyspaceNotification) keyspaceNotificationItemReader.getQueue().remove()).getEventType());
        }
        Assertions.assertEquals(new LinkedHashSet(Arrays.asList(keyEventTypeArr)), linkedHashSet);
    }

    @Test
    void dedupeKeyspaceNotifications() throws Exception {
        enableKeyspaceNotifications(this.sourceClient);
        KeyspaceNotificationItemReader keyspaceNotificationItemReader = new KeyspaceNotificationItemReader(this.sourceClient, StringCodec.UTF8, NOTIFICATION_QUEUE_OPTIONS, NOTIFICATION_PATTERNS);
        try {
            keyspaceNotificationItemReader.open(new ExecutionContext());
            RedisModulesCommands sync = this.sourceConnection.sync();
            sync.zadd("key1", 1.0d, "member1");
            sync.zadd("key1", 2.0d, "member2");
            sync.zadd("key1", 3.0d, "member3");
            awaitUntil(() -> {
                return Boolean.valueOf(keyspaceNotificationItemReader.getQueue().size() == 1);
            });
            Assertions.assertEquals("key1", keyspaceNotificationItemReader.read());
            keyspaceNotificationItemReader.close();
        } catch (Throwable th) {
            try {
                keyspaceNotificationItemReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void readThreads(TestInfo testInfo) throws Exception {
        generate(testInfo);
        RedisItemReader dataStructure = sourceReader().dataStructure();
        setName((ItemReader<?>) dataStructure, testInfo);
        SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
        synchronizedItemStreamReader.setDelegate(dataStructure);
        synchronizedItemStreamReader.afterPropertiesSet();
        SynchronizedListItemWriter synchronizedListItemWriter = new SynchronizedListItemWriter();
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setMaxPoolSize(4);
        threadPoolTaskExecutor.setCorePoolSize(4);
        threadPoolTaskExecutor.afterPropertiesSet();
        run(job(testInfo).start(step(testInfo, synchronizedItemStreamReader, synchronizedListItemWriter).taskExecutor(threadPoolTaskExecutor).throttleLimit(4).build()).build());
        awaitClosed(dataStructure);
        Objects.requireNonNull(synchronizedListItemWriter);
        awaitUntilFalse(synchronizedListItemWriter::isOpen);
        Assertions.assertEquals(this.sourceConnection.sync().dbsize(), synchronizedListItemWriter.getWrittenItems().size());
    }

    @Test
    void readLive(TestInfo testInfo) throws Exception {
        enableKeyspaceNotifications(this.sourceClient);
        LiveRedisItemReader keyDump = sourceLiveReader(10000).keyDump();
        JobExecution runAsync = runAsync(testInfo, keyDump, new SynchronizedListItemWriter());
        awaitOpen(keyDump);
        generate(testInfo, GeneratorReaderOptions.builder().count(123).types(new DataStructure.Type[]{DataStructure.Type.HASH, DataStructure.Type.STRING}).build());
        awaitTermination(runAsync);
        awaitClosed(keyDump);
        Assertions.assertEquals(this.sourceConnection.sync().dbsize(), r0.getWrittenItems().size());
    }

    @Test
    void scanSizeEstimator(TestInfo testInfo) throws Exception {
        generate(testInfo, GeneratorReaderOptions.builder().count(12345).build());
        long longValue = this.sourceConnection.sync().dbsize().longValue();
        ScanSizeEstimator.Builder client = ScanSizeEstimator.client(this.sourceClient);
        Assertions.assertEquals((float) longValue, (float) client.options(ScanSizeEstimatorOptions.builder().match("gen*").build()).build().execute().longValue(), (float) (longValue / 10));
        Assertions.assertEquals((float) (longValue / GeneratorReaderOptions.defaultTypes().size()), (float) client.options(ScanSizeEstimatorOptions.builder().type(DataStructure.Type.HASH.getString()).build()).build().execute().longValue(), (float) (longValue / 10));
    }

    private void generateStreams(TestInfo testInfo) throws JobExecutionException {
        generate(testInfo(testInfo, "streams"), GeneratorReaderOptions.builder().types(new DataStructure.Type[]{DataStructure.Type.STREAM}).streamOptions(GeneratorReaderOptions.StreamOptions.builder().messageCount(STREAM_MESSAGE_COUNT).build()).count(3).build());
    }

    private StreamItemReader.StreamBuilder<String, String> streamReader() {
        return this.sourceClient instanceof RedisModulesClusterClient ? StreamItemReader.client(this.sourceClient) : StreamItemReader.client(this.sourceClient);
    }

    private void assertMessageBody(List<? extends StreamMessage<String, String>> list) {
        for (StreamMessage<String, String> streamMessage : list) {
            Assertions.assertTrue(streamMessage.getBody().containsKey("field1"));
            Assertions.assertTrue(streamMessage.getBody().containsKey("field2"));
        }
    }

    private void assertStreamEquals(String str, Map<String, String> map, String str2, StreamMessage<String, String> streamMessage) {
        Assertions.assertEquals(str, streamMessage.getId());
        Assertions.assertEquals(map, streamMessage.getBody());
        Assertions.assertEquals(str2, streamMessage.getStream());
    }

    private Map<String, String> map(String... strArr) {
        Assert.notNull(strArr, "Args cannot be null");
        Assert.isTrue(strArr.length % 2 == 0, "Args length is not a multiple of 2");
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (int i = 0; i < strArr.length / 2; i++) {
            linkedHashMap.put(strArr[i * 2], strArr[(i * 2) + 1]);
        }
        return linkedHashMap;
    }

    @Test
    void writeStreamTx(TestInfo testInfo) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            HashMap hashMap = new HashMap();
            hashMap.put("field1", "value1");
            hashMap.put("field2", "value2");
            arrayList.add(hashMap);
        }
        run(testInfo, new ListItemReader(arrayList), sourceWriter().options(WriterOptions.builder().multiExec(true).build()).operation(Xadd.key("stream:1").body(IdentityConverter.instance()).build()));
        RedisModulesCommands sync = this.sourceConnection.sync();
        Assertions.assertEquals(arrayList.size(), sync.xlen("stream:1"));
        List xrange = sync.xrange("stream:1", Range.create("-", "+"));
        for (int i2 = 0; i2 < xrange.size(); i2++) {
            Assertions.assertEquals(arrayList.get(i2), ((StreamMessage) xrange.get(i2)).getBody());
        }
    }

    @Test
    void readStreamAutoAck() throws InterruptedException {
        StreamItemReader build = streamReader().stream("stream1").consumer(Consumer.from("batchtests-readStreamAutoAck", "consumer1")).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.AUTO).build()).build();
        build.open(new ExecutionContext());
        Map<String, String> map = map("field1", "value1", "field2", "value2");
        String xadd = this.sourceConnection.sync().xadd("stream1", map);
        String xadd2 = this.sourceConnection.sync().xadd("stream1", map);
        String xadd3 = this.sourceConnection.sync().xadd("stream1", map);
        ArrayList arrayList = new ArrayList();
        awaitUntil(() -> {
            return Boolean.valueOf(arrayList.addAll(build.readMessages()));
        });
        Assertions.assertEquals(3, arrayList.size());
        assertStreamEquals(xadd, map, "stream1", (StreamMessage) arrayList.get(0));
        assertStreamEquals(xadd2, map, "stream1", (StreamMessage) arrayList.get(1));
        assertStreamEquals(xadd3, map, "stream1", (StreamMessage) arrayList.get(2));
        build.close();
        Assertions.assertEquals(0L, this.sourceConnection.sync().xpending("stream1", "batchtests-readStreamAutoAck").getCount(), "pending messages");
    }

    @Test
    void readStreamManualAck() throws Exception {
        StreamItemReader build = streamReader().stream("stream1").consumer(Consumer.from("batchtests-readStreamManualAck", "consumer1")).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.MANUAL).build()).build();
        build.open(new ExecutionContext());
        Map<String, String> map = map("field1", "value1", "field2", "value2");
        String xadd = this.sourceConnection.sync().xadd("stream1", map);
        String xadd2 = this.sourceConnection.sync().xadd("stream1", map);
        String xadd3 = this.sourceConnection.sync().xadd("stream1", map);
        ArrayList arrayList = new ArrayList();
        awaitUntil(() -> {
            return Boolean.valueOf(arrayList.addAll(build.readMessages()));
        });
        Assertions.assertEquals(3, arrayList.size());
        assertStreamEquals(xadd, map, "stream1", (StreamMessage) arrayList.get(0));
        assertStreamEquals(xadd2, map, "stream1", (StreamMessage) arrayList.get(1));
        assertStreamEquals(xadd3, map, "stream1", (StreamMessage) arrayList.get(2));
        Assertions.assertEquals(3L, this.sourceConnection.sync().xpending("stream1", "batchtests-readStreamManualAck").getCount(), "pending messages before commit");
        this.sourceConnection.sync().xack("stream1", "batchtests-readStreamManualAck", new String[]{((StreamMessage) arrayList.get(0)).getId(), ((StreamMessage) arrayList.get(1)).getId()});
        Assertions.assertEquals(1L, this.sourceConnection.sync().xpending("stream1", "batchtests-readStreamManualAck").getCount(), "pending messages after commit");
        build.close();
    }

    @Test
    void readStreamManualAckRecover() throws InterruptedException {
        Consumer from = Consumer.from("batchtests-readStreamManualAckRecover", "consumer1");
        StreamItemReader build = streamReader().stream("stream1").consumer(from).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.MANUAL).build()).build();
        build.open(new ExecutionContext());
        Map<String, String> map = map("field1", "value1", "field2", "value2");
        this.sourceConnection.sync().xadd("stream1", map);
        this.sourceConnection.sync().xadd("stream1", map);
        this.sourceConnection.sync().xadd("stream1", map);
        ArrayList arrayList = new ArrayList();
        awaitUntil(() -> {
            return Boolean.valueOf(arrayList.addAll(build.readMessages()));
        });
        Assertions.assertEquals(3, arrayList.size());
        ArrayList arrayList2 = new ArrayList();
        this.sourceConnection.sync().xadd("stream1", map);
        this.sourceConnection.sync().xadd("stream1", map);
        this.sourceConnection.sync().xadd("stream1", map);
        build.close();
        StreamItemReader build2 = streamReader().stream("stream1").consumer(from).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.MANUAL).build()).build();
        build2.open(new ExecutionContext());
        awaitUntil(() -> {
            return Boolean.valueOf(arrayList2.addAll(build2.readMessages()));
        });
        awaitUntil(() -> {
            return Boolean.valueOf(!arrayList2.addAll(build2.readMessages()));
        });
        Assertions.assertEquals(6, arrayList2.size());
    }

    @Test
    void readStreamManualAckRecoverUncommitted() throws InterruptedException {
        Consumer from = Consumer.from("batchtests-readStreamManualAckRecoverUncommitted", "consumer1");
        StreamItemReader build = streamReader().stream("stream1").consumer(from).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.MANUAL).build()).build();
        build.open(new ExecutionContext());
        Map<String, String> map = map("field1", "value1", "field2", "value2");
        this.sourceConnection.sync().xadd("stream1", map);
        this.sourceConnection.sync().xadd("stream1", map);
        String xadd = this.sourceConnection.sync().xadd("stream1", map);
        ArrayList arrayList = new ArrayList();
        awaitUntil(() -> {
            return Boolean.valueOf(arrayList.addAll(build.readMessages()));
        });
        Assertions.assertEquals(3, arrayList.size());
        this.sourceConnection.sync().xack("stream1", "batchtests-readStreamManualAckRecoverUncommitted", new String[]{((StreamMessage) arrayList.get(0)).getId(), ((StreamMessage) arrayList.get(1)).getId()});
        ArrayList arrayList2 = new ArrayList();
        String xadd2 = this.sourceConnection.sync().xadd("stream1", map);
        String xadd3 = this.sourceConnection.sync().xadd("stream1", map);
        String xadd4 = this.sourceConnection.sync().xadd("stream1", map);
        build.close();
        StreamItemReader build2 = streamReader().stream("stream1").consumer(from).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.MANUAL).offset(((StreamMessage) arrayList.get(1)).getId()).build()).build();
        build2.open(new ExecutionContext());
        awaitUntil(() -> {
            return Boolean.valueOf(arrayList2.addAll(build2.readMessages()));
        });
        awaitUntil(() -> {
            return Boolean.valueOf(!arrayList2.addAll(build2.readMessages()));
        });
        Assertions.assertEquals(Arrays.asList(xadd, xadd2, xadd3, xadd4), (List) arrayList2.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()), "recoveredIds");
        build2.close();
    }

    @Test
    void readStreamManualAckRecoverFromOffset() throws Exception {
        Consumer from = Consumer.from("batchtests-readStreamManualAckRecoverFromOffset", "consumer1");
        StreamItemReader build = streamReader().stream("stream1").consumer(from).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.MANUAL).build()).build();
        build.open(new ExecutionContext());
        Map<String, String> map = map("field1", "value1", "field2", "value2");
        this.sourceConnection.sync().xadd("stream1", map);
        this.sourceConnection.sync().xadd("stream1", map);
        String xadd = this.sourceConnection.sync().xadd("stream1", map);
        ArrayList arrayList = new ArrayList();
        awaitUntil(() -> {
            return Boolean.valueOf(arrayList.addAll(build.readMessages()));
        });
        Assertions.assertEquals(3, arrayList.size());
        ArrayList arrayList2 = new ArrayList();
        String xadd2 = this.sourceConnection.sync().xadd("stream1", map);
        String xadd3 = this.sourceConnection.sync().xadd("stream1", map);
        String xadd4 = this.sourceConnection.sync().xadd("stream1", map);
        build.close();
        StreamItemReader build2 = streamReader().stream("stream1").consumer(from).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.MANUAL).offset(xadd).build()).build();
        build2.open(new ExecutionContext());
        awaitUntil(() -> {
            return Boolean.valueOf(arrayList2.addAll(build2.readMessages()));
        });
        awaitUntil(() -> {
            return Boolean.valueOf(!arrayList2.addAll(build2.readMessages()));
        });
        Assertions.assertEquals(Arrays.asList(xadd2, xadd3, xadd4), (List) arrayList2.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()), "recoveredIds");
        build2.close();
    }

    @Test
    void readStreamRecoverManualAckToAutoAck() throws InterruptedException {
        Consumer from = Consumer.from("readStreamRecoverManualAckToAutoAck", "consumer1");
        StreamItemReader build = streamReader().stream("stream1").consumer(from).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.MANUAL).build()).build();
        build.open(new ExecutionContext());
        Map<String, String> map = map("field1", "value1", "field2", "value2");
        this.sourceConnection.sync().xadd("stream1", map);
        this.sourceConnection.sync().xadd("stream1", map);
        this.sourceConnection.sync().xadd("stream1", map);
        ArrayList arrayList = new ArrayList();
        awaitUntil(() -> {
            return Boolean.valueOf(arrayList.addAll(build.readMessages()));
        });
        Assertions.assertEquals(3, arrayList.size());
        ArrayList arrayList2 = new ArrayList();
        String xadd = this.sourceConnection.sync().xadd("stream1", map);
        String xadd2 = this.sourceConnection.sync().xadd("stream1", map);
        String xadd3 = this.sourceConnection.sync().xadd("stream1", map);
        build.close();
        StreamItemReader build2 = streamReader().stream("stream1").consumer(from).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.AUTO).build()).build();
        build2.open(new ExecutionContext());
        awaitUntil(() -> {
            return Boolean.valueOf(arrayList2.addAll(build2.readMessages()));
        });
        awaitUntil(() -> {
            return Boolean.valueOf(!arrayList2.addAll(build2.readMessages()));
        });
        Assertions.assertEquals(Arrays.asList(xadd, xadd2, xadd3), (List) arrayList2.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()), "recoveredIds");
        Assertions.assertEquals(0L, this.sourceConnection.sync().xpending("stream1", "readStreamRecoverManualAckToAutoAck").getCount(), "pending message count");
        build2.close();
    }

    @Test
    void readMessages(TestInfo testInfo) throws Exception {
        generateStreams(testInfo);
        for (String str : (List) ScanIterator.scan(this.sourceConnection.sync(), KeyScanArgs.Builder.type(DataStructure.Type.STREAM.getString())).stream().collect(Collectors.toList())) {
            long longValue = this.sourceConnection.sync().xlen(str).longValue();
            StreamItemReader build = streamReader().stream(str).consumer(Consumer.from("batchtests-readmessages", "consumer1")).build();
            build.open(new ExecutionContext());
            ArrayList arrayList = new ArrayList();
            awaitUntil(() -> {
                arrayList.addAll(build.readMessages());
                return Boolean.valueOf(((long) arrayList.size()) == longValue);
            });
            assertMessageBody(arrayList);
            awaitUntil(() -> {
                return Boolean.valueOf(build.ack(build.readMessages()) == 0);
            });
            build.close();
        }
    }

    @Test
    void streamReaderJob(TestInfo testInfo) throws Exception {
        generateStreams(testInfo);
        for (String str : (List) ScanIterator.scan(this.sourceConnection.sync(), KeyScanArgs.Builder.type(DataStructure.Type.STREAM.getString())).stream().collect(Collectors.toList())) {
            StreamItemReader build = streamReader().stream(str).consumer(Consumer.from("batchtests-readstreamjob", "consumer1")).build();
            SynchronizedListItemWriter synchronizedListItemWriter = new SynchronizedListItemWriter();
            run(testInfo(testInfo, str), build, synchronizedListItemWriter);
            Assertions.assertEquals(STREAM_MESSAGE_COUNT, synchronizedListItemWriter.getWrittenItems().size());
            assertMessageBody(synchronizedListItemWriter.getWrittenItems());
        }
    }

    @Test
    void readMultipleStreams(TestInfo testInfo) throws Exception {
        generateStreams(testInfo(testInfo, "streams"));
        for (String str : (List) ScanIterator.scan(this.sourceConnection.sync(), KeyScanArgs.Builder.type(DataStructure.Type.STREAM.getString())).stream().collect(Collectors.toList())) {
            StreamItemReader build = streamReader().stream(str).consumer(Consumer.from(DEFAULT_CONSUMER_GROUP, "consumer1")).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.MANUAL).build()).build();
            StreamItemReader build2 = streamReader().stream(str).consumer(Consumer.from(DEFAULT_CONSUMER_GROUP, "consumer2")).options(StreamReaderOptions.builder().ackPolicy(StreamReaderOptions.AckPolicy.MANUAL).build()).build();
            SynchronizedListItemWriter synchronizedListItemWriter = new SynchronizedListItemWriter();
            JobExecution runAsync = runAsync(testInfo(testInfo, str, "1"), build, synchronizedListItemWriter);
            SynchronizedListItemWriter synchronizedListItemWriter2 = new SynchronizedListItemWriter();
            JobExecution runAsync2 = runAsync(testInfo(testInfo, str, "2"), build2, synchronizedListItemWriter2);
            awaitTermination(runAsync);
            awaitTermination(runAsync2);
            awaitClosed(build);
            awaitClosed(build2);
            awaitUntil(() -> {
                return Boolean.valueOf(STREAM_MESSAGE_COUNT == synchronizedListItemWriter.getWrittenItems().size() + synchronizedListItemWriter2.getWrittenItems().size());
            });
            assertMessageBody(synchronizedListItemWriter.getWrittenItems());
            assertMessageBody(synchronizedListItemWriter2.getWrittenItems());
            RedisModulesCommands sync = this.sourceConnection.sync();
            Assertions.assertEquals(57L, sync.xpending(str, DEFAULT_CONSUMER_GROUP).getCount());
            build.open(new ExecutionContext());
            build.ack(synchronizedListItemWriter.getWrittenItems());
            build.close();
            build2.open(new ExecutionContext());
            build2.ack(synchronizedListItemWriter2.getWrittenItems());
            build2.close();
            Assertions.assertEquals(0L, sync.xpending(str, DEFAULT_CONSUMER_GROUP).getCount());
        }
    }

    @Test
    void writeStream(TestInfo testInfo) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            HashMap hashMap = new HashMap();
            hashMap.put("field1", "value1");
            hashMap.put("field2", "value2");
            arrayList.add(hashMap);
        }
        run(testInfo, new ListItemReader(arrayList), sourceWriter().operation(Xadd.key("stream:0").body(IdentityConverter.instance()).build()));
        RedisModulesCommands sync = this.sourceConnection.sync();
        Assertions.assertEquals(arrayList.size(), sync.xlen("stream:0"));
        List xrange = sync.xrange("stream:0", Range.create("-", "+"));
        for (int i2 = 0; i2 < xrange.size(); i2++) {
            Assertions.assertEquals(arrayList.get(i2), ((StreamMessage) xrange.get(i2)).getBody());
        }
    }

    @Test
    void jsonSet(TestInfo testInfo) throws Exception {
        run(testInfo, new IteratorItemReader(Beers.jsonNodeIterator()), sourceWriter().operation(JsonSet.key(jsonNode -> {
            return "beer:" + jsonNode.get("id").asText();
        }).value((v0) -> {
            return v0.toString();
        }).path(".").build()));
        Assertions.assertEquals(BEER_COUNT, this.sourceConnection.sync().keys("beer:*").size());
        Assertions.assertEquals(new ObjectMapper().readTree(JSON_BEER_1), new ObjectMapper().readTree((String) this.sourceConnection.sync().jsonGet("beer:1", new String[0])));
    }

    @Test
    void tsAdd(TestInfo testInfo) throws Exception {
        Random random = new Random();
        ArrayList arrayList = new ArrayList(100);
        for (int i = 0; i < 100; i++) {
            arrayList.add(Sample.of((System.currentTimeMillis() - 100) + (i % (100 / 2)), random.nextDouble()));
        }
        run(testInfo, new ListItemReader(arrayList), sourceWriter().operation(TsAdd.key("ts:1").sample(IdentityConverter.instance()).options(sample -> {
            return AddOptions.builder().policy(DuplicatePolicy.LAST).build();
        }).build()));
        Assertions.assertEquals(100 / 2, this.sourceConnection.sync().tsRange("ts:1", TimeRange.unbounded(), RangeOptions.builder().build()).size(), 2.0f);
    }

    @Test
    void beerIndex() throws Exception {
        Beers.populateIndex(this.sourceConnection);
        Assertions.assertEquals(1019.0d, RedisModulesUtils.indexInfo(this.sourceConnection.sync().ftInfo("beers")).getNumDocs());
    }

    @Test
    void tsComparator(TestInfo testInfo) throws Exception {
        this.sourceConnection.sync().tsAdd("ts:1", Sample.of(123.0d));
        RedisItemReader<String, KeyComparison> comparisonReader = comparisonReader();
        KeyComparisonCountItemWriter keyComparisonCountItemWriter = new KeyComparisonCountItemWriter();
        run(testInfo, comparisonReader, keyComparisonCountItemWriter);
        Assertions.assertEquals(1L, keyComparisonCountItemWriter.getResults().getCount(KeyComparison.Status.MISSING));
    }

    @Test
    void replicateJSON(TestInfo testInfo) throws Exception {
        this.sourceConnection.sync().jsonSet("json:1", "$", JSON_BEER_1);
        this.sourceConnection.sync().jsonSet("json:2", "$", JSON_BEER_1);
        this.sourceConnection.sync().jsonSet("json:3", "$", JSON_BEER_1);
        run(testInfo, sourceReader().dataStructure(), targetWriter().dataStructure());
        Assertions.assertTrue(compare(testInfo));
    }

    @Test
    void replicateTimeSeries(TestInfo testInfo) throws Exception {
        this.sourceConnection.sync().tsCreate("ts:1", CreateOptions.builder().policy(DuplicatePolicy.LAST).build());
        this.sourceConnection.sync().tsAdd("ts:1", Sample.of(1000L, 1.0d));
        this.sourceConnection.sync().tsAdd("ts:1", Sample.of(1001L, 2.0d));
        this.sourceConnection.sync().tsAdd("ts:1", Sample.of(1003L, 3.0d));
        run(testInfo, sourceReader().dataStructure(), targetWriter().dataStructure());
        Assertions.assertTrue(compare(testInfo));
    }

    @Test
    void dataStructures(TestInfo testInfo) throws Exception {
        generate(testInfo, GeneratorReaderOptions.builder().count(100).build());
        RedisItemReader dataStructure = sourceReader().dataStructure();
        run(testInfo, dataStructure, targetDataStructureWriter());
        awaitClosed(dataStructure);
        Assertions.assertTrue(compare(testInfo));
    }

    @Test
    void dumpAndRestore(TestInfo testInfo) throws Exception {
        generate(testInfo, GeneratorReaderOptions.builder().count(100).build());
        RedisItemReader keyDump = sourceReader().keyDump();
        run(testInfo, keyDump, targetKeyDumpWriter());
        awaitClosed(keyDump);
        Assertions.assertTrue(compare(testInfo));
    }

    @Test
    void byteArrayCodec(TestInfo testInfo) throws Exception {
        StatefulRedisModulesConnection connection = RedisModulesUtils.connection(this.sourceClient, ByteArrayCodec.INSTANCE);
        try {
            connection.setAutoFlushCommands(false);
            RedisAsyncCommands async = connection.async();
            ArrayList arrayList = new ArrayList();
            Random random = new Random();
            for (int i = 0; i < 100; i++) {
                byte[] bArr = new byte[1000];
                random.nextBytes(bArr);
                arrayList.add(async.set(("binary:" + i).getBytes(), bArr));
            }
            connection.flushCommands();
            LettuceFutures.awaitAll(connection.getTimeout(), (Future[]) arrayList.toArray(new RedisFuture[0]));
            connection.setAutoFlushCommands(true);
            if (connection != null) {
                connection.close();
            }
            run(testInfo, sourceReader().keyDump(), targetKeyDumpWriter());
            Awaitility.await().until(() -> {
                return Boolean.valueOf(this.sourceConnection.sync().dbsize() == this.targetConnection.sync().dbsize());
            });
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void liveOnly(TestInfo testInfo) throws JobExecutionException {
        enableKeyspaceNotifications(this.sourceClient);
        LiveRedisItemReader keyDump = sourceLiveReader(100000).keyDump();
        RedisItemWriter<String, String, KeyDump<String>> targetKeyDumpWriter = targetKeyDumpWriter();
        JobExecution runAsync = runAsync(testInfo, keyDump, targetKeyDumpWriter);
        awaitOpen(keyDump);
        generate(testInfo, GeneratorReaderOptions.builder().types(new DataStructure.Type[]{DataStructure.Type.HASH, DataStructure.Type.LIST, DataStructure.Type.SET, DataStructure.Type.STRING, DataStructure.Type.ZSET}).build());
        awaitTermination(runAsync);
        awaitClosed(keyDump);
        awaitClosed(targetKeyDumpWriter);
        Assertions.assertTrue(compare(testInfo));
    }

    @Test
    void liveReplication(TestInfo testInfo) throws Exception {
        enableKeyspaceNotifications(this.sourceClient);
        Assertions.assertTrue(liveReplication(testInfo, sourceReader().keyDump(), targetKeyDumpWriter(), sourceLiveReader(100000).keyDump(), targetKeyDumpWriter()));
    }

    @Test
    void liveSet(TestInfo testInfo) throws Exception {
        enableKeyspaceNotifications(this.sourceClient);
        this.sourceConnection.sync().sadd("myset", new String[]{"1", "2", "3", "4", "5"});
        LiveRedisItemReader dataStructure = sourceLiveReader(100).dataStructure();
        RedisItemWriter<String, String, DataStructure<String>> targetDataStructureWriter = targetDataStructureWriter();
        JobExecution runAsync = runAsync(testInfo, dataStructure, targetDataStructureWriter);
        awaitOpen(dataStructure);
        this.sourceConnection.sync().srem("myset", new String[]{"5"});
        awaitTermination(runAsync);
        awaitClosed(dataStructure);
        awaitClosed(targetDataStructureWriter);
        Assertions.assertEquals(this.sourceConnection.sync().smembers("myset"), this.targetConnection.sync().smembers("myset"));
    }

    @Test
    void invalidConnection(TestInfo testInfo) throws Exception {
        RedisModulesClient create = RedisModulesClient.create("redis://badhost:6379");
        try {
            RedisModulesClient create2 = RedisModulesClient.create("redis://badhost:6379");
            try {
                Assertions.assertTrue(run(testInfo, reader(create, ByteArrayCodec.INSTANCE).dataStructure(), writer(create2, ByteArrayCodec.INSTANCE).dataStructure()).getStatus().isUnsuccessful());
                if (create2 != null) {
                    create2.close();
                }
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create2 != null) {
                    try {
                        create2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    void hyperLogLog(TestInfo testInfo) throws Exception {
        this.sourceConnection.sync().pfadd("hll:1", new String[]{"member:1", "member:2"});
        this.sourceConnection.sync().pfadd("hll:2", new String[]{"member:1", "member:2", "member:3"});
        run(testInfo, sourceReader(ByteArrayCodec.INSTANCE).dataStructure(), targetWriter(ByteArrayCodec.INSTANCE).dataStructure());
        Assertions.assertEquals(this.sourceConnection.sync().pfcount(new String[]{"hll:1"}), this.targetConnection.sync().pfcount(new String[]{"hll:1"}));
    }

    @Test
    void liveDataStructures(TestInfo testInfo) throws Exception {
        enableKeyspaceNotifications(this.sourceClient);
        Assertions.assertTrue(liveReplication(testInfo, sourceReader().dataStructure(), targetDataStructureWriter(), sourceLiveReader(100000).dataStructure(), targetDataStructureWriter()));
    }

    private <T extends KeyValue<String>> boolean liveReplication(TestInfo testInfo, RedisItemReader<String, T> redisItemReader, RedisItemWriter<String, String, T> redisItemWriter, LiveRedisItemReader<String, T> liveRedisItemReader, RedisItemWriter<String, String, T> redisItemWriter2) throws Exception {
        generate(testInfo(testInfo, "generate"), GeneratorReaderOptions.builder().types(new DataStructure.Type[]{DataStructure.Type.HASH, DataStructure.Type.LIST, DataStructure.Type.SET, DataStructure.Type.STREAM, DataStructure.Type.STRING, DataStructure.Type.ZSET}).count(300).build());
        Flow flow = (SimpleFlow) new FlowBuilder(name(testInfo(testInfo, "snapshotFlow"))).start(step(testInfo(testInfo, "step"), redisItemReader, null, redisItemWriter).build()).build();
        JobExecution runAsync = runAsync(job(testInfo).start((Flow) new FlowBuilder(name(testInfo(testInfo, "flow"))).split(new SimpleAsyncTaskExecutor()).add(new Flow[]{(SimpleFlow) new FlowBuilder(name(testInfo(testInfo, "liveFlow"))).start(flushingStep(testInfo(testInfo, "liveStep"), liveRedisItemReader, redisItemWriter2).build()).build(), flow}).build()).build().build());
        awaitOpen(liveRedisItemReader);
        generate(testInfo(testInfo, "generateLive"), GeneratorReaderOptions.builder().types(new DataStructure.Type[]{DataStructure.Type.HASH, DataStructure.Type.LIST, DataStructure.Type.SET, DataStructure.Type.STRING, DataStructure.Type.ZSET}).expiration(IntRange.is(100)).keyRange(IntRange.between(300, 1000)).build());
        awaitTermination(runAsync);
        awaitClosed(redisItemReader);
        awaitClosed(liveRedisItemReader);
        awaitClosed(redisItemWriter);
        awaitClosed(redisItemWriter2);
        return compare(testInfo);
    }

    @Test
    void compararator(TestInfo testInfo) throws Exception {
        generate(testInfo, GeneratorReaderOptions.builder().count(120).build());
        run(testInfo(testInfo, "replicate"), sourceReader().keyDump(), targetKeyDumpWriter());
        long j = 0;
        for (int i = 0; i < 13; i++) {
            j += this.targetConnection.sync().del(new String[]{(String) this.targetConnection.sync().randomkey()}).longValue();
        }
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 23; i2++) {
            String str = (String) this.targetConnection.sync().randomkey();
            if (this.targetConnection.sync().expire(str, this.targetConnection.sync().ttl(str).longValue() + 12345).booleanValue()) {
                hashSet.add(str);
            }
        }
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        for (int i3 = 0; i3 < 17; i3++) {
            String str2 = (String) this.targetConnection.sync().randomkey();
            if (DataStructure.Type.of(this.targetConnection.sync().type(str2)) == DataStructure.Type.STRING) {
                if (!hashSet2.contains(str2)) {
                    hashSet3.add(str2);
                }
                hashSet.remove(str2);
            } else {
                hashSet2.add(str2);
                hashSet3.remove(str2);
                hashSet.remove(str2);
            }
            this.targetConnection.sync().set(str2, "blah");
        }
        RedisItemReader<String, KeyComparison> comparisonReader = comparisonReader();
        KeyComparisonCountItemWriter keyComparisonCountItemWriter = new KeyComparisonCountItemWriter();
        run(testInfo(testInfo, "compare"), comparisonReader, keyComparisonCountItemWriter);
        KeyComparisonCountItemWriter.Results results = keyComparisonCountItemWriter.getResults();
        long longValue = this.sourceConnection.sync().dbsize().longValue();
        Assertions.assertEquals(longValue, results.getTotalCount());
        Assertions.assertEquals(longValue, this.targetConnection.sync().dbsize().longValue() + j);
        Assertions.assertEquals(hashSet2.size(), results.getCount(KeyComparison.Status.TYPE));
        Assertions.assertEquals(hashSet3.size(), results.getCount(KeyComparison.Status.VALUE));
        Assertions.assertEquals(hashSet.size(), results.getCount(KeyComparison.Status.TTL));
        Assertions.assertEquals(j, results.getCount(KeyComparison.Status.MISSING));
    }
}
