package com.redis.spring.batch;

import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.FaultToleranceOptions;
import com.redis.spring.batch.common.FlushingOptions;
import com.redis.spring.batch.common.JobRunner;
import com.redis.spring.batch.reader.GeneratorItemReader;
import com.redis.spring.batch.reader.GeneratorReaderOptions;
import com.redis.spring.batch.reader.QueueItemReader;
import com.redis.spring.batch.step.FlushingChunkProvider;
import com.redis.spring.batch.step.FlushingSimpleStepBuilder;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.PlatformTransactionManager;

@SpringBootTest(classes = {BatchTestApplication.class})
@RunWith(SpringRunner.class)
/* loaded from: input_file:com/redis/spring/batch/StepTests.class */
class StepTests {

    @Autowired
    protected JobRepository jobRepository;

    @Autowired
    protected PlatformTransactionManager transactionManager;

    @Autowired
    protected JobBuilderFactory jobBuilderFactory;

    @Autowired
    protected StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobLauncher jobLauncher;
    private JobRunner jobRunner;

    StepTests() {
    }

    @BeforeEach
    private void createJobRunner() {
        this.jobRunner = new JobRunner(this.jobRepository, this.transactionManager);
    }

    @Test
    void readKeyValueFaultTolerance() throws Exception {
        ErrorItemReader errorItemReader = new ErrorItemReader(new GeneratorItemReader(GeneratorReaderOptions.builder().count(100).types(new DataStructure.Type[]{DataStructure.Type.STRING}).build()));
        SynchronizedListItemWriter synchronizedListItemWriter = new SynchronizedListItemWriter();
        SimpleStepBuilder chunk = this.jobRunner.step("readKeyValueFaultTolerance").chunk(1);
        chunk.reader(errorItemReader).writer(synchronizedListItemWriter);
        this.jobRunner.getJobLauncher().run(this.jobRunner.job("readKeyValueFaultTolerance").start(JobRunner.faultTolerant(chunk, FaultToleranceOptions.builder().skip(new Class[]{TimeoutException.class}).skipPolicy(new AlwaysSkipItemSkipPolicy()).build()).build()).build(), new JobParameters());
        Assertions.assertEquals(100 * 0.5f, synchronizedListItemWriter.getWrittenItems().size());
    }

    @Test
    void readerSkipPolicy() throws Exception {
        List list = (List) IntStream.range(0, 100).boxed().collect(Collectors.toList());
        ErrorItemReader errorItemReader = new ErrorItemReader(new ListItemReader(list));
        SynchronizedListItemWriter synchronizedListItemWriter = new SynchronizedListItemWriter();
        FlushingSimpleStepBuilder flushingSimpleStepBuilder = new FlushingSimpleStepBuilder(this.stepBuilderFactory.get("skip-policy").chunk(1).reader(errorItemReader).writer(synchronizedListItemWriter));
        flushingSimpleStepBuilder.idleTimeout(Duration.ofMillis(100L)).skip(TimeoutException.class).skipPolicy(new AlwaysSkipItemSkipPolicy());
        this.jobRunner.getJobLauncher().run(this.jobBuilderFactory.get("skip-policy").start(flushingSimpleStepBuilder.build()).build(), new JobParameters());
        Assertions.assertEquals(list.size(), synchronizedListItemWriter.getWrittenItems().size() * 2);
    }

    @Test
    void flushingStep() throws Exception {
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque(100);
        QueueItemReader queueItemReader = new QueueItemReader(linkedBlockingDeque, Duration.ofMillis(10L));
        SynchronizedListItemWriter synchronizedListItemWriter = new SynchronizedListItemWriter();
        SimpleStepBuilder chunk = this.jobRunner.step("flushingStep").chunk(50);
        chunk.reader(queueItemReader).writer(synchronizedListItemWriter);
        JobExecution run = this.jobRunner.getAsyncJobLauncher().run(this.jobRunner.job("flushingStep").start(JobRunner.flushing(chunk, FlushingOptions.builder().flushingInterval(FlushingChunkProvider.DEFAULT_FLUSHING_INTERVAL).idleTimeout(Duration.ofMillis(500L)).build()).build()).build(), new JobParameters());
        this.jobRunner.awaitRunning(run);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(queueItemReader.isOpen());
        });
        for (int i = 1; i <= 100; i++) {
            linkedBlockingDeque.offer("key" + i);
        }
        Awaitility.await().until(() -> {
            return Boolean.valueOf(JobRunner.isTerminated(run));
        });
        Assertions.assertEquals(100, synchronizedListItemWriter.getWrittenItems().size());
    }
}
