/*
 * Decompiled with CFR 0.152.
 */
package io.aleph0.yap.core.worker;

import io.aleph0.yap.core.ProcessorWorker;
import io.aleph0.yap.core.Sink;
import io.aleph0.yap.core.Source;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class MapMultiProcessorWorker<X, Y>
implements ProcessorWorker<X, Y> {
    private final BiConsumer<X, Consumer<Y>> mapper;

    public MapMultiProcessorWorker(BiConsumer<X, Consumer<Y>> mapper) {
        this.mapper = Objects.requireNonNull(mapper, "mapper");
    }

    @Override
    public void process(Source<X> source, Sink<Y> sink) throws InterruptedException {
        try {
            X originalItem = source.take();
            while (originalItem != null) {
                this.mapper.accept(originalItem, mappedItem -> {
                    try {
                        sink.put(mappedItem);
                    }
                    catch (InterruptedException e) {
                        throw new UncheckedInterruptedException();
                    }
                });
                originalItem = source.take();
            }
        }
        catch (UncheckedInterruptedException e) {
            Thread.currentThread().interrupt();
            throw new InterruptedException();
        }
    }

    private static class UncheckedInterruptedException
    extends RuntimeException {
        public UncheckedInterruptedException() {
            super("interrupted");
        }
    }
}

