package reactor.bus;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.bus.filter.PassThroughFilter;
import reactor.bus.registry.Registration;
import reactor.bus.registry.Registry;
import reactor.bus.routing.ConsumerFilteringRouter;
import reactor.bus.routing.Router;
import reactor.bus.selector.Selector;
import reactor.core.Exceptions;
import reactor.core.MultiProducer;
import reactor.core.Producer;
import reactor.core.publisher.Flux;
import reactor.io.util.UUIDUtils;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/bus/AbstractBus.class */
public abstract class AbstractBus<K, V> implements Bus<K, V>, MultiProducer {
    protected static final Router DEFAULT_EVENT_ROUTER = new ConsumerFilteringRouter(new PassThroughFilter());
    private final Registry<K, BiConsumer<K, ? extends V>> consumerRegistry;
    private final Router router;
    private final Consumer<Throwable> processorErrorHandler;
    private final Consumer<Throwable> uncaughtErrorHandler;
    private final int concurrency;
    private volatile UUID id;

    /* loaded from: input_file:reactor/bus/AbstractBus$BusConsumer.class */
    private static class BusConsumer<K, T> implements BiConsumer<K, T>, Producer {
        private final Consumer<T> consumer;

        public BusConsumer(Consumer<T> consumer) {
            this.consumer = consumer;
        }

        public Object downstream() {
            return this.consumer;
        }

        @Override // java.util.function.BiConsumer
        public void accept(K k, T t) {
            this.consumer.accept(t);
        }
    }

    public AbstractBus(@Nonnull Registry<K, BiConsumer<K, ? extends V>> registry, int i, @Nullable Router router, @Nullable Consumer<Throwable> consumer, @Nullable final Consumer<Throwable> consumer2) {
        this.consumerRegistry = (Registry) Objects.requireNonNull(registry, "Consumer Registry cannot be null.");
        this.concurrency = i;
        this.router = null == router ? DEFAULT_EVENT_ROUTER : router;
        if (null == consumer) {
            this.processorErrorHandler = new Consumer<Throwable>() { // from class: reactor.bus.AbstractBus.1
                @Override // java.util.function.Consumer
                public void accept(Throwable th) {
                    if (consumer2 == null) {
                        Loggers.getLogger(AbstractBus.class).error(th.getMessage(), th);
                    } else {
                        consumer2.accept(th);
                    }
                }
            };
        } else {
            this.processorErrorHandler = consumer;
        }
        this.uncaughtErrorHandler = consumer2;
    }

    public synchronized UUID getId() {
        if (null == this.id) {
            this.id = UUIDUtils.create();
        }
        return this.id;
    }

    public Registry<K, BiConsumer<K, ? extends V>> getConsumerRegistry() {
        return this.consumerRegistry;
    }

    public Router getRouter() {
        return this.router;
    }

    public Consumer<Throwable> getProcessorErrorHandler() {
        return this.processorErrorHandler;
    }

    public Consumer<Throwable> getUncaughtErrorHandler() {
        return this.uncaughtErrorHandler;
    }

    @Override // reactor.bus.Bus
    public boolean respondsToKey(K k) {
        List<Registration<K, ? extends BiConsumer<K, ? extends V>>> select = this.consumerRegistry.select(k);
        if (select.isEmpty()) {
            return false;
        }
        Iterator<Registration<K, ? extends BiConsumer<K, ? extends V>>> it = select.iterator();
        while (it.hasNext()) {
            if (!it.next().isCancelled()) {
                return true;
            }
        }
        return false;
    }

    @Override // reactor.bus.Bus
    public <T extends V> Registration<K, BiConsumer<K, ? extends V>> on(Selector selector, BiConsumer<K, T> biConsumer) {
        Objects.requireNonNull(selector, "Selector cannot be null.");
        Objects.requireNonNull(biConsumer, "Consumer cannot be null.");
        return this.consumerRegistry.register(selector, (Selector) biConsumer);
    }

    @Override // reactor.bus.Bus
    public <V1 extends V> Registration<K, BiConsumer<K, ? extends V>> on(Selector selector, Consumer<V1> consumer) {
        return on(selector, new BusConsumer(consumer));
    }

    @Override // reactor.bus.Bus
    public <V1 extends V> Registration<K, BiConsumer<K, ? extends V>> onKey(K k, BiConsumer<K, V1> biConsumer) {
        Objects.requireNonNull(k, "Key cannot be null.");
        Objects.requireNonNull(biConsumer, "Consumer cannot be null.");
        return this.consumerRegistry.register((Registry<K, BiConsumer<K, ? extends V>>) k, (K) biConsumer);
    }

    @Override // reactor.bus.Bus
    public <T extends V> Registration<K, BiConsumer<K, ? extends V>> onKey(K k, Consumer<T> consumer) {
        return onKey((AbstractBus<K, V>) k, (BiConsumer<AbstractBus<K, V>, V1>) new BusConsumer(consumer));
    }

    public int getConcurrency() {
        return this.concurrency;
    }

    public Flux<? extends V> on(Selector selector) {
        return new BusFlux(this, selector);
    }

    @Override // reactor.bus.Bus
    public AbstractBus notify(K k, V v) {
        Objects.requireNonNull(k, "Key cannot be null.");
        Objects.requireNonNull(v, "Event cannot be null.");
        accept(k, v);
        return this;
    }

    @Override // reactor.bus.Bus
    public AbstractBus notify(K k, Supplier<? extends V> supplier) {
        return notify((AbstractBus<K, V>) k, (K) supplier.get());
    }

    public Iterator<?> downstreams() {
        return this.consumerRegistry.iterator();
    }

    public long downstreamCount() {
        return this.consumerRegistry.size();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void errorHandlerOrThrow(Throwable th) {
        if (this.processorErrorHandler == null) {
            Exceptions.onErrorDropped(th);
        } else {
            Exceptions.throwIfFatal(th);
            this.processorErrorHandler.accept(th);
        }
    }

    protected abstract void accept(K k, V v);

    /* JADX INFO: Access modifiers changed from: protected */
    public void route(K k, V v) {
        this.router.route(k, v, this.consumerRegistry.select(k), null, this.processorErrorHandler);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.bus.Bus
    public /* bridge */ /* synthetic */ Bus notify(Object obj, Supplier supplier) {
        return notify((AbstractBus<K, V>) obj, supplier);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.bus.Bus
    public /* bridge */ /* synthetic */ Bus notify(Object obj, Object obj2) {
        return notify((AbstractBus<K, V>) obj, obj2);
    }
}
