package io.aleph0.yap.messaging.core;

import io.aleph0.yap.core.Sink;
import io.aleph0.yap.core.Source;
import io.aleph0.yap.core.worker.MeasuredProcessorWorker;
import io.aleph0.yap.messaging.core.Acknowledgeable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/aleph0/yap/messaging/core/SplitGateProcessorWorker.class */
public class SplitGateProcessorWorker<X, Y> implements MeasuredProcessorWorker<Message<X>, Message<Y>, Metrics> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SplitGateProcessorWorker.class);
    private final AtomicInteger gatedMetric = new AtomicInteger(0);
    private final Function<X, Y> mapperFunction;
    private final Function<X, Collection<Y>> splitterFunction;

    /* loaded from: input_file:io/aleph0/yap/messaging/core/SplitGateProcessorWorker$Metrics.class */
    public static final class Metrics extends Record {
        private final int gated;

        public Metrics(int i) {
            if (i < 0) {
                throw new IllegalArgumentException("gatedMetric must not be negative");
            }
            this.gated = i;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Metrics.class), Metrics.class, "gated", "FIELD:Lio/aleph0/yap/messaging/core/SplitGateProcessorWorker$Metrics;->gated:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Metrics.class), Metrics.class, "gated", "FIELD:Lio/aleph0/yap/messaging/core/SplitGateProcessorWorker$Metrics;->gated:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Metrics.class, Object.class), Metrics.class, "gated", "FIELD:Lio/aleph0/yap/messaging/core/SplitGateProcessorWorker$Metrics;->gated:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public int gated() {
            return this.gated;
        }
    }

    public SplitGateProcessorWorker(Function<X, Y> function, Function<X, Collection<Y>> function2) {
        this.mapperFunction = (Function) Objects.requireNonNull(function, "mapperFunction");
        this.splitterFunction = (Function) Objects.requireNonNull(function2, "splitterFunction");
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void process(Source<Message<X>> source, Sink<Message<Y>> sink) throws InterruptedException {
        Object take = source.take();
        while (true) {
            Message message = (Message) take;
            if (message == null) {
                return;
            }
            Collection collection = (Collection) this.splitterFunction.apply(message.body());
            Message mappedMessage = mappedMessage(message);
            if (collection.isEmpty()) {
                sink.put(mappedMessage);
            } else {
                this.gatedMetric.incrementAndGet();
                int i = 0;
                Iterator it = collection.iterator();
                AtomicInteger atomicInteger = new AtomicInteger(collection.size());
                while (it.hasNext()) {
                    sink.put(splitMessage(mappedMessage, atomicInteger, i, it.next(), sink));
                    i++;
                }
            }
            take = source.take();
        }
    }

    Message<Y> mappedMessage(final Message<X> message) {
        final Y apply = this.mapperFunction.apply(message.body());
        return new Message<Y>(this) { // from class: io.aleph0.yap.messaging.core.SplitGateProcessorWorker.1
            @Override // io.aleph0.yap.messaging.core.Acknowledgeable
            public void ack(Acknowledgeable.AcknowledgementListener acknowledgementListener) {
                message.ack(acknowledgementListener);
            }

            @Override // io.aleph0.yap.messaging.core.Acknowledgeable
            public void nack(Acknowledgeable.AcknowledgementListener acknowledgementListener) {
                message.nack(acknowledgementListener);
            }

            @Override // io.aleph0.yap.messaging.core.Message
            public String id() {
                return message.id();
            }

            @Override // io.aleph0.yap.messaging.core.Message
            public Map<String, String> attributes() {
                return message.attributes();
            }

            @Override // io.aleph0.yap.messaging.core.Message
            public Y body() {
                return (Y) apply;
            }
        };
    }

    Message<Y> splitMessage(final Message<Y> message, final AtomicInteger atomicInteger, int i, final Y y, final Sink<Message<Y>> sink) {
        final String str = message.id() + "-" + i;
        return new Message<Y>(this) { // from class: io.aleph0.yap.messaging.core.SplitGateProcessorWorker.2
            private final AtomicBoolean acked = new AtomicBoolean(false);
            final /* synthetic */ SplitGateProcessorWorker this$0;

            {
                this.this$0 = this;
            }

            @Override // io.aleph0.yap.messaging.core.Acknowledgeable
            public void ack(Acknowledgeable.AcknowledgementListener acknowledgementListener) {
                if (this.acked.compareAndSet(false, true)) {
                    int decrementAndGet = atomicInteger.decrementAndGet();
                    try {
                        acknowledgementListener.onSuccess();
                        if (decrementAndGet == 0) {
                            this.this$0.gatedMetric.decrementAndGet();
                            try {
                                sink.put(message);
                            } catch (InterruptedException e) {
                                SplitGateProcessorWorker.LOGGER.atError().setCause(e).addKeyValue("messageId", str).log("Failed to put gatedMetric message due to interruption");
                                Thread.currentThread().interrupt();
                            }
                        }
                    } catch (Throwable th) {
                        if (decrementAndGet == 0) {
                            this.this$0.gatedMetric.decrementAndGet();
                            try {
                                sink.put(message);
                            } catch (InterruptedException e2) {
                                SplitGateProcessorWorker.LOGGER.atError().setCause(e2).addKeyValue("messageId", str).log("Failed to put gatedMetric message due to interruption");
                                Thread.currentThread().interrupt();
                            }
                        }
                        throw th;
                    }
                }
            }

            @Override // io.aleph0.yap.messaging.core.Acknowledgeable
            public void nack(Acknowledgeable.AcknowledgementListener acknowledgementListener) {
                if (this.acked.compareAndSet(false, true)) {
                    int andSet = atomicInteger.getAndSet(-1);
                    try {
                        acknowledgementListener.onSuccess();
                        if (andSet > 0) {
                            message.nack(new Acknowledgeable.AcknowledgementListener() { // from class: io.aleph0.yap.messaging.core.SplitGateProcessorWorker.2.1
                                @Override // io.aleph0.yap.messaging.core.Acknowledgeable.AcknowledgementListener
                                public void onSuccess() {
                                    AnonymousClass2.this.this$0.gatedMetric.decrementAndGet();
                                    SplitGateProcessorWorker.LOGGER.atDebug().addKeyValue("messageId", str).log("Successfully nacked gatedMetric message");
                                }

                                @Override // io.aleph0.yap.messaging.core.Acknowledgeable.AcknowledgementListener
                                public void onFailure(Throwable th) {
                                    AnonymousClass2.this.this$0.gatedMetric.decrementAndGet();
                                    SplitGateProcessorWorker.LOGGER.atWarn().setCause(th).addKeyValue("messageId", str).log("Failed to nack gatedMetric message");
                                }
                            });
                        }
                    } catch (Throwable th) {
                        if (andSet > 0) {
                            message.nack(new Acknowledgeable.AcknowledgementListener() { // from class: io.aleph0.yap.messaging.core.SplitGateProcessorWorker.2.1
                                @Override // io.aleph0.yap.messaging.core.Acknowledgeable.AcknowledgementListener
                                public void onSuccess() {
                                    AnonymousClass2.this.this$0.gatedMetric.decrementAndGet();
                                    SplitGateProcessorWorker.LOGGER.atDebug().addKeyValue("messageId", str).log("Successfully nacked gatedMetric message");
                                }

                                @Override // io.aleph0.yap.messaging.core.Acknowledgeable.AcknowledgementListener
                                public void onFailure(Throwable th2) {
                                    AnonymousClass2.this.this$0.gatedMetric.decrementAndGet();
                                    SplitGateProcessorWorker.LOGGER.atWarn().setCause(th2).addKeyValue("messageId", str).log("Failed to nack gatedMetric message");
                                }
                            });
                        }
                        throw th;
                    }
                }
            }

            @Override // io.aleph0.yap.messaging.core.Message
            public String id() {
                return str;
            }

            @Override // io.aleph0.yap.messaging.core.Message
            public Map<String, String> attributes() {
                return message.attributes();
            }

            @Override // io.aleph0.yap.messaging.core.Message
            public Y body() {
                return (Y) y;
            }
        };
    }

    /* renamed from: checkMetrics, reason: merged with bridge method [inline-methods] */
    public Metrics m5checkMetrics() {
        return new Metrics(this.gatedMetric.get());
    }

    /* renamed from: flushMetrics, reason: merged with bridge method [inline-methods] */
    public Metrics m4flushMetrics() {
        return m5checkMetrics();
    }
}
