package com.redis.spring.batch;

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.lettucemod.util.ClientBuilder;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.FlushingOptions;
import com.redis.spring.batch.common.JobRunner;
import com.redis.spring.batch.common.KeyDump;
import com.redis.spring.batch.common.Openable;
import com.redis.spring.batch.common.PoolOptions;
import com.redis.spring.batch.reader.DataStructureValueReader;
import com.redis.spring.batch.reader.GeneratorItemReader;
import com.redis.spring.batch.reader.GeneratorReaderOptions;
import com.redis.spring.batch.reader.KeyComparatorOptions;
import com.redis.spring.batch.reader.KeyComparison;
import com.redis.spring.batch.reader.LiveRedisItemReader;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.reader.QueueOptions;
import com.redis.spring.batch.step.FlushingSimpleStepBuilder;
import com.redis.testcontainers.RedisServer;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestInstance;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = {BatchTestApplication.class})
@RunWith(SpringRunner.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:com/redis/spring/batch/AbstractTestBase.class */
abstract class AbstractTestBase {
    private final Logger log = Logger.getLogger(getClass().getName());
    public static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofMillis(500);
    protected static final FlushingOptions DEFAULT_FLUSHING_OPTIONS = FlushingOptions.builder().idleTimeout(DEFAULT_IDLE_TIMEOUT).build();
    private static final Duration DEFAULT_AWAIT_TIMEOUT = Duration.ofSeconds(1);

    @Value("${running-timeout:PT5S}")
    private Duration runningTimeout;

    @Value("${termination-timeout:PT5S}")
    private Duration terminationTimeout;
    private JobRunner jobRunner;
    protected AbstractRedisClient sourceClient;
    protected StatefulRedisModulesConnection<String, String> sourceConnection;
    private AbstractRedisClient targetClient;
    protected StatefulRedisModulesConnection<String, String> targetConnection;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/AbstractTestBase$SimpleTestInfo.class */
    public static class SimpleTestInfo implements TestInfo {
        private final TestInfo delegate;
        private final String[] suffixes;

        public SimpleTestInfo(TestInfo testInfo, String... strArr) {
            this.delegate = testInfo;
            this.suffixes = strArr;
        }

        public String getDisplayName() {
            ArrayList arrayList = new ArrayList();
            arrayList.add(this.delegate.getDisplayName());
            arrayList.addAll(Arrays.asList(this.suffixes));
            return String.join("-", arrayList);
        }

        public Set<String> getTags() {
            return this.delegate.getTags();
        }

        public Optional<Class<?>> getTestClass() {
            return this.delegate.getTestClass();
        }

        public Optional<Method> getTestMethod() {
            return this.delegate.getTestMethod();
        }
    }

    protected abstract RedisServer getSourceServer();

    protected abstract RedisServer getTargetServer();

    @BeforeAll
    void setup() {
        getSourceServer().start();
        getTargetServer().start();
        this.sourceClient = client(getSourceServer());
        this.sourceConnection = RedisModulesUtils.connection(this.sourceClient);
        this.targetClient = client(getTargetServer());
        this.targetConnection = RedisModulesUtils.connection(this.targetClient);
        this.jobRunner = JobRunner.inMemory().runningTimeout(this.runningTimeout).terminationTimeout(this.terminationTimeout);
    }

    @AfterAll
    void teardown() {
        this.sourceConnection.close();
        this.sourceClient.shutdown();
        this.sourceClient.getResources().shutdown();
        this.targetConnection.close();
        this.targetClient.shutdown();
        this.targetClient.getResources().shutdown();
        getTargetServer().close();
        getSourceServer().close();
    }

    @BeforeEach
    void flushAll() {
        this.sourceConnection.sync().flushall();
        this.targetConnection.sync().flushall();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String name(TestInfo testInfo) {
        return testInfo.getDisplayName().replace("(TestInfo)", "");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static TestInfo testInfo(TestInfo testInfo, String... strArr) {
        return new SimpleTestInfo(testInfo, strArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisItemReader.Builder<String, String> sourceReader() {
        return reader(this.sourceClient, StringCodec.UTF8);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <K, V> RedisItemReader.Builder<K, V> sourceReader(RedisCodec<K, V> redisCodec) {
        return reader(this.sourceClient, redisCodec);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <K, V> RedisItemReader.Builder<K, V> reader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        return abstractRedisClient instanceof RedisModulesClusterClient ? RedisItemReader.client((RedisModulesClusterClient) abstractRedisClient, redisCodec).jobRunner(this.jobRunner) : RedisItemReader.client((RedisModulesClient) abstractRedisClient, redisCodec).jobRunner(this.jobRunner);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisItemWriter.Builder<String, String> targetWriter() {
        return writer(this.targetClient, StringCodec.UTF8);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <K, V> RedisItemWriter.Builder<K, V> targetWriter(RedisCodec<K, V> redisCodec) {
        return writer(this.targetClient, redisCodec);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <K, V> RedisItemWriter.Builder<K, V> writer(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        return abstractRedisClient instanceof RedisModulesClusterClient ? RedisItemWriter.client((RedisModulesClusterClient) abstractRedisClient, redisCodec) : RedisItemWriter.client((RedisModulesClient) abstractRedisClient, redisCodec);
    }

    protected AbstractRedisClient client(RedisServer redisServer) {
        return ClientBuilder.create(RedisURI.create(redisServer.getRedisURI())).cluster(redisServer.isCluster()).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void awaitOpen(Object obj) {
        if (obj instanceof Openable) {
            Openable openable = (Openable) obj;
            Objects.requireNonNull(openable);
            awaitUntil(openable::isOpen);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void awaitClosed(Object obj) {
        if (obj instanceof Openable) {
            Openable openable = (Openable) obj;
            Objects.requireNonNull(openable);
            awaitUntilFalse(openable::isOpen);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void awaitTermination(JobExecution jobExecution) {
        this.jobRunner.awaitTermination(jobExecution);
        this.log.log(Level.INFO, "Job {0} terminated", jobExecution.getJobInstance().getJobName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void awaitUntilFalse(Callable<Boolean> callable) {
        awaitUntil(() -> {
            return Boolean.valueOf(!((Boolean) callable.call()).booleanValue());
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void awaitUntil(Callable<Boolean> callable) {
        Awaitility.await().timeout(DEFAULT_AWAIT_TIMEOUT).until(callable);
    }

    protected <I, O> JobExecution runAsync(TestInfo testInfo, PollableItemReader<I> pollableItemReader, ItemProcessor<I, O> itemProcessor, ItemWriter<O> itemWriter) throws JobExecutionException {
        JobExecution runAsync = runAsync(testInfo, flushingStep(testInfo, pollableItemReader, itemProcessor, itemWriter));
        awaitOpen(pollableItemReader);
        awaitOpen(itemWriter);
        return runAsync;
    }

    private <I, O> JobExecution runAsync(TestInfo testInfo, SimpleStepBuilder<I, O> simpleStepBuilder) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
        return runAsync(job(testInfo, simpleStepBuilder));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Job job(TestInfo testInfo, SimpleStepBuilder<?, ?> simpleStepBuilder) {
        return job(testInfo).start(simpleStepBuilder.build()).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobBuilder job(TestInfo testInfo) {
        return this.jobRunner.job(name(testInfo));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobExecution run(Job job) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
        JobExecution run = this.jobRunner.getJobLauncher().run(job, new JobParameters());
        this.jobRunner.awaitTermination(run);
        return run;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobExecution runAsync(Job job) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
        JobExecution run = this.jobRunner.getAsyncJobLauncher().run(job, new JobParameters());
        this.jobRunner.awaitRunning(run);
        return run;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <I, O> FlushingSimpleStepBuilder<I, O> flushingStep(TestInfo testInfo, PollableItemReader<I> pollableItemReader, ItemWriter<O> itemWriter) {
        return flushingStep(testInfo, pollableItemReader, null, itemWriter);
    }

    protected <I, O> FlushingSimpleStepBuilder<I, O> flushingStep(TestInfo testInfo, PollableItemReader<I> pollableItemReader, ItemProcessor<I, O> itemProcessor, ItemWriter<O> itemWriter) {
        return JobRunner.flushing(step(testInfo, pollableItemReader, itemProcessor, itemWriter), DEFAULT_FLUSHING_OPTIONS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <I, O> SimpleStepBuilder<I, O> step(TestInfo testInfo, ItemReader<I> itemReader, ItemWriter<O> itemWriter) {
        return step(testInfo, itemReader, null, itemWriter);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <I, O> SimpleStepBuilder<I, O> step(TestInfo testInfo, ItemReader<I> itemReader, ItemProcessor<I, O> itemProcessor, ItemWriter<O> itemWriter) {
        setName((ItemReader<?>) itemReader, testInfo);
        setName((ItemWriter<?>) itemWriter, testInfo);
        SimpleStepBuilder<I, O> chunk = this.jobRunner.step(name(testInfo)).chunk(50);
        chunk.reader(itemReader).processor(itemProcessor).writer(itemWriter);
        return chunk;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <I, O> JobExecution runAsync(TestInfo testInfo, PollableItemReader<I> pollableItemReader, ItemWriter<O> itemWriter) throws JobExecutionException {
        return runAsync(testInfo, pollableItemReader, null, itemWriter);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <I, O> JobExecution run(TestInfo testInfo, ItemReader<I> itemReader, ItemWriter<O> itemWriter) throws JobExecutionException {
        return run(testInfo, itemReader, null, itemWriter);
    }

    protected <I, O> JobExecution run(TestInfo testInfo, ItemReader<I> itemReader, ItemProcessor<I, O> itemProcessor, ItemWriter<O> itemWriter) throws JobExecutionException {
        JobExecution run = run(job(testInfo, step(testInfo, itemReader, itemProcessor, itemWriter)));
        awaitClosed(itemReader);
        awaitClosed(itemWriter);
        return run;
    }

    protected static void setName(ItemWriter<?> itemWriter, TestInfo testInfo) {
        setName(itemWriter, testInfo, "writer");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void setName(ItemReader<?> itemReader, TestInfo testInfo) {
        setName(itemReader, testInfo, "reader");
    }

    private static void setName(Object obj, TestInfo testInfo, String str) {
        if (obj instanceof ItemStreamSupport) {
            ((ItemStreamSupport) obj).setName(name(testInfo(testInfo, str)));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean compare(TestInfo testInfo) throws JobExecutionException {
        Assertions.assertEquals(this.sourceConnection.sync().dbsize(), this.targetConnection.sync().dbsize(), "Source and target have different db sizes");
        RedisItemReader<String, KeyComparison> comparisonReader = comparisonReader();
        SynchronizedListItemWriter synchronizedListItemWriter = new SynchronizedListItemWriter();
        run(testInfo(testInfo, "compare"), comparisonReader, synchronizedListItemWriter);
        Assertions.assertFalse(synchronizedListItemWriter.getWrittenItems().isEmpty());
        for (KeyComparison keyComparison : synchronizedListItemWriter.getWrittenItems()) {
            Assertions.assertEquals(KeyComparison.Status.OK, keyComparison.getStatus(), MessageFormat.format("Key={0}, Type={1}", keyComparison.getKey(), keyComparison.getSource().getType()));
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisItemReader<String, KeyComparison> comparisonReader() {
        return sourceReader().comparator(this.targetClient).comparatorOptions(KeyComparatorOptions.builder().ttlTolerance(Duration.ofMillis(100L)).build()).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void generate(TestInfo testInfo) throws JobExecutionException {
        generate(testInfo, GeneratorReaderOptions.builder().build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void generate(TestInfo testInfo, GeneratorReaderOptions generatorReaderOptions) throws JobExecutionException {
        TestInfo testInfo2 = testInfo(testInfo, "generate");
        run(job(testInfo2, step(testInfo2, new GeneratorItemReader(generatorReaderOptions), null, sourceWriter().dataStructure())));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisItemWriter.Builder<String, String> sourceWriter() {
        return writer(this.sourceClient, StringCodec.UTF8);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisItemWriter<String, String, KeyDump<String>> targetKeyDumpWriter() {
        return targetWriter().keyDump();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisItemWriter<String, String, DataStructure<String>> targetDataStructureWriter() {
        return targetWriter().dataStructure(streamMessage -> {
            return new XAddArgs().id(streamMessage.getId());
        });
    }

    protected DataStructureValueReader<String, String> sourceDataStructureValueReader() {
        return new DataStructureValueReader<>(this.sourceClient, StringCodec.UTF8, PoolOptions.builder().build());
    }

    protected LiveRedisItemReader.Builder<String, String> sourceLiveReader() {
        return sourceReader().live().flushingOptions(DEFAULT_FLUSHING_OPTIONS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LiveRedisItemReader.Builder<String, String> sourceLiveReader(int i) {
        return sourceLiveReader().queueOptions(QueueOptions.builder().capacity(i).build());
    }

    protected void flushAll(AbstractRedisClient abstractRedisClient) {
        StatefulRedisModulesConnection connection = RedisModulesUtils.connection(abstractRedisClient);
        try {
            connection.sync().flushall();
            Awaitility.await().until(() -> {
                return Boolean.valueOf(connection.sync().dbsize().longValue() == 0);
            });
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
