package reactor.bus;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.bus.Event;
import reactor.bus.registry.Registration;
import reactor.bus.registry.Registries;
import reactor.bus.registry.Registry;
import reactor.bus.routing.Router;
import reactor.bus.selector.ClassSelector;
import reactor.bus.selector.Selector;
import reactor.bus.selector.Selectors;
import reactor.bus.spec.EventBusSpec;
import reactor.core.flow.Loopback;
import reactor.core.flow.Producer;
import reactor.core.publisher.Flux;
import reactor.core.state.Introspectable;
import reactor.core.subscriber.Subscribers;
import reactor.core.util.BackpressureUtils;
import reactor.core.util.EmptySubscription;
import reactor.core.util.Logger;
import reactor.core.util.ReactiveStateUtils;

/* loaded from: input_file:reactor/bus/EventBus.class */
public class EventBus extends AbstractBus<Object, Event<?>> implements Consumer<Event<?>>, Loopback {
    private final Processor<Event<?>, Event<?>> processor;

    /* loaded from: input_file:reactor/bus/EventBus$BusErrorConsumer.class */
    private static class BusErrorConsumer implements Consumer<Event<Throwable>> {
        final Logger log = Logger.getLogger(EventBus.class);
        private final Consumer<Throwable> uncaughtErrorHandler;

        public BusErrorConsumer(Consumer<Throwable> consumer) {
            this.uncaughtErrorHandler = consumer;
        }

        @Override // java.util.function.Consumer
        public void accept(Event<Throwable> event) {
            if (null == this.uncaughtErrorHandler) {
                this.log.error(event.getData().getMessage(), event.getData());
            } else {
                this.uncaughtErrorHandler.accept(event.getData());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/bus/EventBus$ConsumerEvent.class */
    public static class ConsumerEvent<T> extends Event<Consumer<T>> {
        private final T data;

        public ConsumerEvent(Consumer<T> consumer, T t) {
            super(consumer);
            this.data = t;
        }

        void run() {
            ((Consumer) getData()).accept(this.data);
        }
    }

    /* loaded from: input_file:reactor/bus/EventBus$DispatchEventSubscriber.class */
    private final class DispatchEventSubscriber implements BiConsumer<Event<?>, Subscription> {
        private DispatchEventSubscriber() {
        }

        @Override // java.util.function.BiConsumer
        public void accept(Event<?> event, Subscription subscription) {
            EventBus.this.accept(event);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/bus/EventBus$EventBusConsumer.class */
    public static class EventBusConsumer<T extends Event<?>> implements Consumer<T>, Introspectable, Producer {
        private final Selector selector;
        private final Class<?> tClass;
        private final Consumer<T> consumer;

        public EventBusConsumer(Selector selector, Class<?> cls, Consumer<T> consumer) {
            this.selector = selector;
            this.tClass = cls;
            this.consumer = consumer;
        }

        @Override // java.util.function.Consumer
        public void accept(T t) {
            if (null != this.selector.getHeaderResolver()) {
                t.getHeaders().setAll(this.selector.getHeaderResolver().apply(t.getKey()));
            }
            if (this.tClass == null || t.getData() == null || this.tClass.isAssignableFrom(t.getData().getClass())) {
                this.consumer.accept(t);
            }
        }

        public Object downstream() {
            return this.consumer;
        }

        public int getMode() {
            return 1;
        }

        public String getName() {
            return EventBusConsumer.class.getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/bus/EventBus$EventSubscriber.class */
    public final class EventSubscriber<T> implements Subscriber<T> {
        private final Function<? super T, Object> keyMapper;
        Subscription s;

        public EventSubscriber(Function<? super T, Object> function) {
            this.keyMapper = function;
        }

        public void onSubscribe(Subscription subscription) {
            if (BackpressureUtils.validate(this.s, subscription)) {
                this.s = subscription;
                subscription.request(Long.MAX_VALUE);
            }
        }

        public void onNext(T t) {
            EventBus.this.notify((EventBus) this.keyMapper.apply(t), (Object) Event.wrap(t));
        }

        public void onError(Throwable th) {
            this.s = null;
        }

        public void onComplete() {
            this.s = null;
        }
    }

    /* loaded from: input_file:reactor/bus/EventBus$PreparedConsumer.class */
    private final class PreparedConsumer<T> implements Consumer<Event<T>> {
        final List<Registration<Object, ? extends BiConsumer<Object, ? extends Event<?>>>> regs;
        final int size;
        private final Object key;

        public PreparedConsumer(Object obj) {
            this.key = obj;
            this.regs = EventBus.this.getConsumerRegistry().select(obj);
            this.size = this.regs.size();
        }

        @Override // java.util.function.Consumer
        public void accept(Event<T> event) {
            for (int i = 0; i < this.size; i++) {
                Registration<Object, ? extends BiConsumer<Object, ? extends Event<?>>> registration = this.regs.get(i);
                event.setKey(this.key);
                if (EventBus.this.processor == null) {
                    try {
                        accept((Event) event);
                    } catch (Throwable th) {
                        EventBus.this.errorHandlerOrThrow(th);
                    }
                } else {
                    EventBus.this.schedule((Consumer) registration.getObject(), event);
                }
            }
        }
    }

    /* loaded from: input_file:reactor/bus/EventBus$ReplyToConsumer.class */
    public class ReplyToConsumer<E extends Event<?>, V> implements Consumer<E> {
        private final Function<E, V> fn;

        private ReplyToConsumer(Function<E, V> function) {
            this.fn = function;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v22, types: [reactor.bus.Bus] */
        @Override // java.util.function.Consumer
        public void accept(E e) {
            Event wrap;
            ?? replyToObservable;
            EventBus eventBus = EventBus.this;
            if (ReplyToEvent.class.isAssignableFrom(e.getClass()) && 0 != (replyToObservable = ((ReplyToEvent) e).getReplyToObservable())) {
                eventBus = replyToObservable;
            }
            try {
                V apply = this.fn.apply(e);
                if (null == apply) {
                    wrap = new Event(Void.class);
                } else {
                    wrap = Event.class.isAssignableFrom(apply.getClass()) ? (Event) apply : Event.wrap(apply);
                }
                eventBus.notify((EventBus) e.getReplyTo(), (Object) wrap);
            } catch (Throwable th) {
                eventBus.notify((EventBus) th.getClass(), (Class<?>) Event.wrap(th));
            }
        }

        public Function<E, V> getDelegate() {
            return this.fn;
        }
    }

    /* loaded from: input_file:reactor/bus/EventBus$ReplyToEvent.class */
    public static class ReplyToEvent<T> extends Event<T> {
        private static final long serialVersionUID = 1937884784799135647L;
        private final Bus replyToObservable;

        private ReplyToEvent(Event.Headers headers, T t, Object obj, Bus bus, Consumer<Throwable> consumer) {
            super(headers, t, consumer);
            setReplyTo(obj);
            this.replyToObservable = bus;
        }

        private ReplyToEvent(Event<T> event, Bus bus) {
            this(event.getHeaders(), event.getData(), event.getReplyTo(), bus, event.getErrorConsumer());
        }

        @Override // reactor.bus.Event
        public <X> Event<X> copy(X x) {
            return new ReplyToEvent(getHeaders(), x, getReplyTo(), this.replyToObservable, getErrorConsumer());
        }

        public Bus getReplyToObservable() {
            return this.replyToObservable;
        }
    }

    /* loaded from: input_file:reactor/bus/EventBus$UncaughtExceptionConsumer.class */
    private static class UncaughtExceptionConsumer implements Consumer<Throwable> {
        private final Registry<Object, BiConsumer<Object, ? extends Event<?>>> consumerRegistry;

        public UncaughtExceptionConsumer(Registry<Object, BiConsumer<Object, ? extends Event<?>>> registry) {
            this.consumerRegistry = registry;
        }

        @Override // java.util.function.Consumer
        public void accept(Throwable th) {
            Class<?> cls = th.getClass();
            AbstractBus.DEFAULT_EVENT_ROUTER.route(cls, Event.wrap(th), this.consumerRegistry.select(cls), null, null);
        }
    }

    public static EventBusSpec config() {
        return new EventBusSpec();
    }

    public static EventBus create() {
        return create(null);
    }

    public static EventBus create(Processor<Event<?>, Event<?>> processor) {
        return create(processor, 1);
    }

    public static EventBus create(Processor<Event<?>, Event<?>> processor, int i) {
        return new EventBus(processor, i);
    }

    public EventBus(@Nullable Processor<Event<?>, Event<?>> processor, int i) {
        this(processor, i, null);
    }

    public EventBus(@Nullable Processor<Event<?>, Event<?>> processor, int i, @Nullable Router router) {
        this(processor, i, router, null, null);
    }

    public EventBus(@Nullable Processor<Event<?>, Event<?>> processor, int i, @Nullable Router router, @Nullable Consumer<Throwable> consumer, @Nullable Consumer<Throwable> consumer2) {
        this(Registries.create(), processor, i, router, consumer, consumer2);
    }

    public EventBus(@Nonnull Registry<Object, BiConsumer<Object, ? extends Event<?>>> registry, @Nullable Processor<Event<?>, Event<?>> processor, int i, @Nullable Router router, @Nullable Consumer<Throwable> consumer, @Nullable Consumer<Throwable> consumer2) {
        super(registry, i, router, consumer != null ? consumer : new UncaughtExceptionConsumer(registry), consumer2);
        this.processor = processor;
        if (processor != null) {
            for (int i2 = 0; i2 < i; i2++) {
                processor.subscribe(Subscribers.unbounded(new DispatchEventSubscriber(), getProcessorErrorHandler()));
            }
            processor.onSubscribe(EmptySubscription.INSTANCE);
        }
        on(new ClassSelector(Throwable.class), new BusErrorConsumer(consumer2));
    }

    public Processor<Event<?>, Event<?>> getProcessor() {
        return this.processor;
    }

    public Object connectedInput() {
        return this.processor;
    }

    public Object connectedOutput() {
        return this.processor;
    }

    @Override // reactor.bus.AbstractBus, reactor.bus.Bus
    public <T extends Event<?>> Registration<Object, BiConsumer<Object, ? extends Event<?>>> on(Selector selector, Consumer<T> consumer) {
        Objects.requireNonNull(consumer, "Consumer cannot be null.");
        return super.on(selector, new EventBusConsumer(selector, extractGeneric(consumer), consumer));
    }

    private Class<?> extractGeneric(Consumer<? extends Event<?>> consumer) {
        if (consumer.getClass().getGenericInterfaces().length == 0) {
            return null;
        }
        Type type = consumer.getClass().getGenericInterfaces()[0];
        if (!ParameterizedType.class.isAssignableFrom(type.getClass())) {
            return null;
        }
        ParameterizedType parameterizedType = (ParameterizedType) type;
        if (parameterizedType.getActualTypeArguments().length == 0) {
            return null;
        }
        Type type2 = parameterizedType.getActualTypeArguments()[0];
        if (!ParameterizedType.class.isAssignableFrom(type2.getClass())) {
            return null;
        }
        ParameterizedType parameterizedType2 = (ParameterizedType) type2;
        if (parameterizedType2.getActualTypeArguments().length == 0) {
            return null;
        }
        Type type3 = parameterizedType2.getActualTypeArguments()[0];
        if (type3 instanceof ParameterizedType) {
            return (Class) ((ParameterizedType) type3).getRawType();
        }
        if (type3 instanceof Class) {
            return (Class) type3;
        }
        return null;
    }

    @Override // reactor.bus.AbstractBus
    public Flux<? extends Event<?>> on(Selector selector) {
        return new BusFlux(this, selector);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.bus.AbstractBus
    public void accept(Object obj, Event<?> event) {
        event.setKey(obj);
        if (this.processor != null) {
            this.processor.onNext(event);
            return;
        }
        try {
            accept(event);
        } catch (Throwable th) {
            errorHandlerOrThrow(th);
        }
    }

    public final EventBus notify(@Nonnull Publisher<?> publisher, @Nonnull final Object obj) {
        return notify((Publisher) publisher, (Function) new Function<Object, Object>() { // from class: reactor.bus.EventBus.1
            @Override // java.util.function.Function
            public Object apply(Object obj2) {
                return obj;
            }
        });
    }

    public final <T> EventBus notify(@Nonnull Publisher<? extends T> publisher, @Nonnull Function<? super T, Object> function) {
        publisher.subscribe(new EventSubscriber(function));
        return this;
    }

    public <T extends Event<?>, V> Registration<?, BiConsumer<Object, ? extends Event<?>>> receive(Selector selector, Function<T, V> function) {
        return on(selector, new ReplyToConsumer(function));
    }

    @Override // reactor.bus.AbstractBus, reactor.bus.Bus
    public EventBus notify(Object obj, Supplier<? extends Event<?>> supplier) {
        notify((EventBus) obj, (Object) supplier.get());
        return this;
    }

    public EventBus notify(Object obj) {
        notify((EventBus) obj, (Object) new Event(Void.class));
        return this;
    }

    public EventBus send(Object obj, Event<?> event) {
        notify((EventBus) obj, (Object) new ReplyToEvent(event, this));
        return this;
    }

    public EventBus send(Object obj, Supplier<? extends Event<?>> supplier) {
        notify((EventBus) obj, (Object) new ReplyToEvent(supplier.get(), this));
        return this;
    }

    public EventBus send(Object obj, Event<?> event, Bus bus) {
        notify((EventBus) obj, (Object) new ReplyToEvent(event, bus));
        return this;
    }

    public EventBus send(Object obj, Supplier<? extends Event<?>> supplier, Bus bus) {
        notify((EventBus) obj, (Object) new ReplyToEvent(supplier.get(), bus));
        return this;
    }

    public <T extends Event<?>> EventBus sendAndReceive(Object obj, Event<?> event, Consumer<T> consumer) {
        Selector anonymous = Selectors.anonymous();
        on(anonymous, consumer).cancelAfterUse();
        notify((EventBus) obj, (Object) event.setReplyTo(anonymous.getObject()));
        return this;
    }

    public <T extends Event<?>> EventBus sendAndReceive(Object obj, Supplier<? extends Event<?>> supplier, Consumer<T> consumer) {
        return sendAndReceive(obj, supplier.get(), consumer);
    }

    public <T> Consumer<Event<T>> prepare(Object obj) {
        return new PreparedConsumer(obj);
    }

    public <T> void schedule(Consumer<T> consumer, T t) {
        if (this.processor != null) {
            this.processor.onNext(new ConsumerEvent(consumer, t));
            return;
        }
        try {
            consumer.accept(t);
        } catch (Throwable th) {
            errorHandlerOrThrow(th);
        }
    }

    @Override // java.util.function.Consumer
    public void accept(Event<?> event) {
        if (event.getClass() == ConsumerEvent.class) {
            ((ConsumerEvent) event).run();
        } else {
            route(event.getKey(), event);
        }
    }

    public ReactiveStateUtils.Graph debug() {
        return ReactiveStateUtils.scan(this);
    }

    @Override // reactor.bus.AbstractBus, reactor.bus.Bus
    public /* bridge */ /* synthetic */ AbstractBus notify(Object obj, Supplier supplier) {
        return notify(obj, (Supplier<? extends Event<?>>) supplier);
    }

    @Override // reactor.bus.AbstractBus, reactor.bus.Bus
    public /* bridge */ /* synthetic */ Bus notify(Object obj, Supplier supplier) {
        return notify(obj, (Supplier<? extends Event<?>>) supplier);
    }
}
