/*
 * Decompiled with CFR 0.152.
 */
package io.annot8.implementations.pipeline;

import io.annot8.api.components.Annot8Component;
import io.annot8.api.components.Processor;
import io.annot8.api.components.ProcessorDescriptor;
import io.annot8.api.components.Resource;
import io.annot8.api.components.Source;
import io.annot8.api.components.SourceDescriptor;
import io.annot8.api.components.responses.ProcessorResponse;
import io.annot8.api.components.responses.SourceResponse;
import io.annot8.api.context.Context;
import io.annot8.api.data.Item;
import io.annot8.api.data.ItemFactory;
import io.annot8.api.exceptions.IncompleteException;
import io.annot8.api.pipelines.Pipeline;
import io.annot8.api.pipelines.PipelineDescriptor;
import io.annot8.common.components.logging.Logging;
import io.annot8.common.components.metering.Metering;
import io.annot8.implementations.pipeline.InMemoryPipelineRunner;
import io.annot8.implementations.support.context.SimpleContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimplePipeline
implements Pipeline {
    private final String name;
    private final String description;
    private final Collection<Source> sources;
    private final Collection<Processor> processors;
    private final Context context;
    private final Logger logger;

    private SimplePipeline(Context context, String name, String description, Collection<Source> sources, Collection<Processor> processors) {
        this.name = name;
        this.description = description;
        this.sources = sources;
        this.processors = processors;
        this.context = context;
        this.logger = this.getContext().getResource(Logging.class).map(l -> l.getLogger(InMemoryPipelineRunner.class)).orElse(LoggerFactory.getLogger(InMemoryPipelineRunner.class));
    }

    public String getName() {
        return this.name;
    }

    public String getDescription() {
        return this.description;
    }

    public Context getContext() {
        return this.context;
    }

    public Collection<Source> getSources() {
        return this.sources;
    }

    public Collection<Processor> getProcessors() {
        return this.processors;
    }

    public SourceResponse read(ItemFactory itemFactory) {
        Optional<Source> optional = this.getSources().stream().findFirst();
        if (optional.isEmpty()) {
            return SourceResponse.done();
        }
        Source source = optional.get();
        this.logger.debug("[{}] Reading source {} for new items", (Object)this.getName(), (Object)source.toString());
        SourceResponse response = source.read(itemFactory);
        switch (response.getStatus()) {
            case DONE: {
                this.logger.info("[{}] Finished reading all items from source {}", (Object)this.getName(), (Object)source.toString());
                this.remove(source);
                return this.read(itemFactory);
            }
            case SOURCE_ERROR: {
                this.logger.error("[{}] Source {} returned a non-recoverable error and has been removed from the pipeline", (Object)this.getName(), (Object)source.toString());
                if (response.hasExceptions()) {
                    for (Exception e : response.getExceptions()) {
                        this.logger.error("The following exception was caught by the source", (Throwable)e);
                    }
                }
                this.remove(source);
                return response;
            }
        }
        return response;
    }

    public ProcessorResponse process(Item item) {
        this.logger.debug("[{}] Beginning processing of item {}", (Object)this.getName(), (Object)item.getId());
        LinkedList<Processor> erroring = new LinkedList<Processor>();
        ProcessorResponse.ProcessorResponseBuilder response = ProcessorResponse.ok();
        for (Processor processor : this.getProcessors()) {
            this.logger.debug("[{}] Processing item {} using processor {}", new Object[]{this.getName(), item.getId(), processor.toString()});
            response = processor.process(item);
            if (response.getStatus() == ProcessorResponse.Status.ITEM_ERROR) {
                this.logger.error("[{}] Processor {} returned an error whilst processing the current item {}, and the item will not be processed by the remainder of the pipeline", new Object[]{this.getName(), processor.toString(), item.getId()});
                if (!response.hasExceptions()) break;
                for (Exception e : response.getExceptions()) {
                    this.logger.error("The following exception was caught by the processor", (Throwable)e);
                }
                break;
            }
            if (response.getStatus() != ProcessorResponse.Status.PROCESSOR_ERROR) continue;
            this.logger.error("[{}] Processor {} returned a non-recoverable error whilst processing the current item {}, and the processor has been removed from the pipeline", new Object[]{this.getName(), processor.toString(), item.getId()});
            if (response.hasExceptions()) {
                for (Exception e : response.getExceptions()) {
                    this.logger.error("The following exception was caught by the processor", (Throwable)e);
                }
            }
            erroring.add(processor);
        }
        erroring.forEach(this::remove);
        return response;
    }

    protected void remove(Processor processor) {
        this.processors.remove(processor);
    }

    protected void remove(Source source) {
        this.sources.remove(source);
    }

    public void close() {
        this.sources.stream().forEach(Annot8Component::close);
        this.processors.stream().forEach(Annot8Component::close);
        this.context.getResources().forEach(Annot8Component::close);
    }

    public static class Builder
    implements Pipeline.Builder {
        private String name;
        private String description;
        private List<Source> sources = new ArrayList<Source>();
        private List<Processor> processors = new ArrayList<Processor>();
        private List<Resource> resources = new ArrayList<Resource>();
        private PipelineDescriptor descriptor = null;
        private Context context;

        public Builder from(PipelineDescriptor pipelineDescriptor) {
            this.descriptor = pipelineDescriptor;
            return this;
        }

        public Builder withResource(Resource resource) {
            this.resources.add(resource);
            return this;
        }

        public Builder withName(String name) {
            this.name = name;
            return this;
        }

        public Builder withDescription(String description) {
            this.description = description;
            return this;
        }

        public Builder withSource(Source source) {
            this.sources.add(source);
            return this;
        }

        public Builder withProcessor(Processor processor) {
            this.processors.add(processor);
            return this;
        }

        public Builder withContext(Context context) {
            this.context = context;
            return this;
        }

        public Pipeline build() throws IncompleteException {
            if (this.descriptor != null && this.name == null) {
                this.name = this.descriptor.getName();
            }
            if (this.name == null || this.name.isEmpty()) {
                throw new IncompleteException("Pipeline must have a name");
            }
            if (this.context != null) {
                this.context.getResources().forEach(this.resources::add);
            }
            if (!this.resources.stream().anyMatch(Logging.class::isInstance)) {
                this.resources.add((Resource)Logging.useLoggerFactory());
            }
            if (!this.resources.stream().anyMatch(Metering.class::isInstance)) {
                this.resources.add((Resource)Metering.useGlobalRegistry((String)this.name));
            }
            SimpleContext pipelineContext = new SimpleContext(this.resources);
            if (this.descriptor != null) {
                this.descriptor.getSources().stream().map(arg_0 -> Builder.lambda$build$0((Context)pipelineContext, arg_0)).map(Source.class::cast).forEach(this::withSource);
                this.descriptor.getProcessors().stream().map(arg_0 -> Builder.lambda$build$1((Context)pipelineContext, arg_0)).map(Processor.class::cast).forEach(this::withProcessor);
                if (this.name == null) {
                    this.name = this.descriptor.getName();
                }
                if (this.description == null) {
                    this.description = this.descriptor.getDescription();
                }
            }
            if (this.sources.isEmpty()) {
                throw new IncompleteException("Pipeline requires at least one source");
            }
            if (this.processors.isEmpty()) {
                throw new IncompleteException("Pipeline requires at least one processor");
            }
            return new SimplePipeline((Context)pipelineContext, this.name, this.description, this.sources, this.processors);
        }

        private static /* synthetic */ Annot8Component lambda$build$1(Context pipelineContext, ProcessorDescriptor d) {
            return d.create(pipelineContext);
        }

        private static /* synthetic */ Annot8Component lambda$build$0(Context pipelineContext, SourceDescriptor d) {
            return d.create(pipelineContext);
        }
    }
}

