/*
 * Decompiled with CFR 0.152.
 */
package io.namastack.outbox;

import io.namastack.outbox.OutboxInstanceRegistry;
import io.namastack.outbox.OutboxProperties;
import io.namastack.outbox.OutboxRecord;
import io.namastack.outbox.OutboxRecordProcessor;
import io.namastack.outbox.OutboxRecordRepository;
import io.namastack.outbox.OutboxRecordStatus;
import io.namastack.outbox.partition.PartitionCoordinator;
import io.namastack.outbox.retry.OutboxRetryPolicy;
import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import kotlin.Metadata;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Scheduled;

@Metadata(mv={2, 2, 0}, k=1, xi=48, d1={"\u0000j\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0010\u000e\n\u0000\n\u0002\u0010\u000b\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\u0018\u00002\u00020\u0001BG\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b\u0012\u0006\u0010\f\u001a\u00020\r\u0012\u0006\u0010\u000e\u001a\u00020\u000f\u0012\u0006\u0010\u0010\u001a\u00020\u0011\u00a2\u0006\u0004\b\u0012\u0010\u0013J\b\u0010\u0017\u001a\u00020\u0018H\u0007J\u0010\u0010\u0019\u001a\u00020\u00182\u0006\u0010\u001a\u001a\u00020\u001bH\u0002J\u0010\u0010\u001c\u001a\u00020\u001d2\u0006\u0010\u001e\u001a\u00020\u001fH\u0002J\u001c\u0010 \u001a\u00020\u00182\u0006\u0010\u001e\u001a\u00020\u001f2\n\u0010!\u001a\u00060\"j\u0002`#H\u0002R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\f\u001a\u00020\rX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0016\u0010\u0014\u001a\n \u0016*\u0004\u0018\u00010\u00150\u0015X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006$"}, d2={"Lio/namastack/outbox/OutboxProcessingScheduler;", "", "recordRepository", "Lio/namastack/outbox/OutboxRecordRepository;", "recordProcessor", "Lio/namastack/outbox/OutboxRecordProcessor;", "partitionCoordinator", "Lio/namastack/outbox/partition/PartitionCoordinator;", "instanceRegistry", "Lio/namastack/outbox/OutboxInstanceRegistry;", "taskExecutor", "Lorg/springframework/core/task/TaskExecutor;", "retryPolicy", "Lio/namastack/outbox/retry/OutboxRetryPolicy;", "properties", "Lio/namastack/outbox/OutboxProperties;", "clock", "Ljava/time/Clock;", "<init>", "(Lio/namastack/outbox/OutboxRecordRepository;Lio/namastack/outbox/OutboxRecordProcessor;Lio/namastack/outbox/partition/PartitionCoordinator;Lio/namastack/outbox/OutboxInstanceRegistry;Lorg/springframework/core/task/TaskExecutor;Lio/namastack/outbox/retry/OutboxRetryPolicy;Lio/namastack/outbox/OutboxProperties;Ljava/time/Clock;)V", "log", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "process", "", "processAggregate", "aggregateId", "", "processRecord", "", "record", "Lio/namastack/outbox/OutboxRecord;", "handleFailure", "ex", "Ljava/lang/Exception;", "Lkotlin/Exception;", "namastack-outbox-core"})
@SourceDebugExtension(value={"SMAP\nOutboxProcessingScheduler.kt\nKotlin\n*S Kotlin\n*F\n+ 1 OutboxProcessingScheduler.kt\nio/namastack/outbox/OutboxProcessingScheduler\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,194:1\n1869#2,2:195\n*S KotlinDebug\n*F\n+ 1 OutboxProcessingScheduler.kt\nio/namastack/outbox/OutboxProcessingScheduler\n*L\n78#1:195,2\n*E\n"})
public final class OutboxProcessingScheduler {
    @NotNull
    private final OutboxRecordRepository recordRepository;
    @NotNull
    private final OutboxRecordProcessor recordProcessor;
    @NotNull
    private final PartitionCoordinator partitionCoordinator;
    @NotNull
    private final OutboxInstanceRegistry instanceRegistry;
    @NotNull
    private final TaskExecutor taskExecutor;
    @NotNull
    private final OutboxRetryPolicy retryPolicy;
    @NotNull
    private final OutboxProperties properties;
    @NotNull
    private final Clock clock;
    private final Logger log;

    public OutboxProcessingScheduler(@NotNull OutboxRecordRepository recordRepository, @NotNull OutboxRecordProcessor recordProcessor, @NotNull PartitionCoordinator partitionCoordinator, @NotNull OutboxInstanceRegistry instanceRegistry, @NotNull TaskExecutor taskExecutor, @NotNull OutboxRetryPolicy retryPolicy, @NotNull OutboxProperties properties, @NotNull Clock clock) {
        Intrinsics.checkNotNullParameter((Object)recordRepository, (String)"recordRepository");
        Intrinsics.checkNotNullParameter((Object)recordProcessor, (String)"recordProcessor");
        Intrinsics.checkNotNullParameter((Object)partitionCoordinator, (String)"partitionCoordinator");
        Intrinsics.checkNotNullParameter((Object)instanceRegistry, (String)"instanceRegistry");
        Intrinsics.checkNotNullParameter((Object)taskExecutor, (String)"taskExecutor");
        Intrinsics.checkNotNullParameter((Object)retryPolicy, (String)"retryPolicy");
        Intrinsics.checkNotNullParameter((Object)properties, (String)"properties");
        Intrinsics.checkNotNullParameter((Object)clock, (String)"clock");
        this.recordRepository = recordRepository;
        this.recordProcessor = recordProcessor;
        this.partitionCoordinator = partitionCoordinator;
        this.instanceRegistry = instanceRegistry;
        this.taskExecutor = taskExecutor;
        this.retryPolicy = retryPolicy;
        this.properties = properties;
        this.clock = clock;
        this.log = LoggerFactory.getLogger(OutboxProcessingScheduler.class);
    }

    @Scheduled(fixedDelayString="${outbox.poll-interval}")
    public final void process() {
        try {
            String myInstanceId = this.instanceRegistry.getCurrentInstanceId();
            List<Integer> assignedPartitions = this.partitionCoordinator.getAssignedPartitions(myInstanceId);
            if (assignedPartitions.isEmpty()) {
                this.log.debug("No partitions assigned to instance {} - waiting for rebalancing", (Object)myInstanceId);
                return;
            }
            Object[] objectArray = new Object[]{assignedPartitions.size(), myInstanceId, assignedPartitions};
            this.log.debug("Processing {} partitions for instance {}: {}", objectArray);
            List<String> aggregateIds = this.recordRepository.findAggregateIdsInPartitions(assignedPartitions, OutboxRecordStatus.NEW, this.properties.getBatchSize());
            if (!((Collection)aggregateIds).isEmpty()) {
                this.log.debug("Found {} aggregates to process", (Object)aggregateIds.size());
                Iterable $this$forEach$iv = aggregateIds;
                boolean $i$f$forEach = false;
                for (Object element$iv : $this$forEach$iv) {
                    String aggregateId = (String)element$iv;
                    boolean bl = false;
                    this.taskExecutor.execute(() -> OutboxProcessingScheduler.process$lambda$0$0(this, aggregateId));
                }
            }
        }
        catch (Exception ex) {
            this.log.error("Error during partition-aware outbox processing", (Throwable)ex);
        }
    }

    private final void processAggregate(String aggregateId) {
        try {
            List<OutboxRecord> records = this.recordRepository.findAllIncompleteRecordsByAggregateId(aggregateId);
            if (records.isEmpty()) {
                return;
            }
            this.log.debug("Processing {} records for aggregate {}", (Object)records.size(), (Object)aggregateId);
            for (OutboxRecord record : records) {
                if (!record.canBeRetried$namastack_outbox_core(this.clock)) {
                    this.log.debug("Skipping record {} - not ready for retry", (Object)record.getId());
                } else {
                    boolean success = this.processRecord(record);
                    if (success || !this.properties.getProcessing().getStopOnFirstFailure()) continue;
                    this.log.debug("\ud83d\uded1 Stopping aggregate {} processing due to failure (stopOnFirstFailure=true)", (Object)aggregateId);
                }
                break;
            }
        }
        catch (Exception ex) {
            this.log.error("Error processing aggregate {}", (Object)aggregateId, (Object)ex);
        }
    }

    private final boolean processRecord(OutboxRecord record) {
        boolean bl;
        try {
            Object[] objectArray = new Object[]{record.getEventType(), record.getAggregateId(), record.getPartition()};
            this.log.debug("\u23f3 Processing {} for {} (partition {})", objectArray);
            this.recordProcessor.process(record);
            if (this.properties.getProcessing().getDeleteCompletedRecords()) {
                this.log.debug("Deleting outbox record {} after successful processing (deleteCompletedRecords=true)", (Object)record.getId());
                this.recordRepository.deleteById(record.getId());
            } else {
                this.log.debug("Marking outbox record {} as completed", (Object)record.getId());
                record.markCompleted$namastack_outbox_core(this.clock);
                this.recordRepository.save(record);
            }
            this.log.debug("\u2705 Successfully processed {} for {}", (Object)record.getEventType(), (Object)record.getAggregateId());
            bl = true;
        }
        catch (Exception ex) {
            this.handleFailure(record, ex);
            bl = false;
        }
        return bl;
    }

    private final void handleFailure(OutboxRecord record, Exception ex) {
        Object[] objectArray = new Object[]{record.getEventType(), record.getAggregateId(), ex.getMessage()};
        this.log.debug("\u274c Failed {} for {}: {}", objectArray);
        record.incrementRetryCount$namastack_outbox_core();
        if (record.retriesExhausted$namastack_outbox_core(this.properties.getRetry().getMaxRetries()) || !this.retryPolicy.shouldRetry(ex)) {
            record.markFailed$namastack_outbox_core();
            objectArray = new Object[]{record.getId(), record.getAggregateId(), record.getRetryCount()};
            this.log.warn("\ud83d\udeab Record {} for aggregate {} marked as FAILED after {} retries", objectArray);
        } else {
            Duration delay = this.retryPolicy.nextDelay(record.getRetryCount());
            record.scheduleNextRetry$namastack_outbox_core(delay, this.clock);
            Object[] objectArray2 = new Object[]{record.getRetryCount(), record.getId(), delay};
            this.log.debug("\ud83d\udd04 Scheduled retry #{} for record {} in {}", objectArray2);
        }
        this.recordRepository.save(record);
    }

    private static final void process$lambda$0$0(OutboxProcessingScheduler this$0, String $aggregateId) {
        this$0.processAggregate($aggregateId);
    }
}

