package reactor.bus.stream;

import javax.annotation.Nonnull;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.bus.Bus;
import reactor.bus.EventBus;
import reactor.bus.registry.Registration;
import reactor.bus.selector.Selector;
import reactor.core.support.ReactiveState;
import reactor.fn.BiConsumer;
import reactor.fn.Consumer;
import reactor.rx.Stream;
import reactor.rx.subscriber.SerializedSubscriber;

/* loaded from: input_file:reactor/bus/stream/BusStream.class */
public final class BusStream<T> extends Stream<T> {
    private final Selector selector;
    private final Bus<?, T> observable;
    private final boolean ordering;

    /* loaded from: input_file:reactor/bus/stream/BusStream$BusToSubscriber.class */
    private class BusToSubscriber implements Consumer<T>, ReactiveState.Trace, ReactiveState.Downstream {
        private final Subscriber<? super T> subscriber;

        public BusToSubscriber(Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
        }

        public Object downstream() {
            return this.subscriber;
        }

        public void accept(T t) {
            this.subscriber.onNext(t);
        }
    }

    /* loaded from: input_file:reactor/bus/stream/BusStream$BusToSubscription.class */
    private class BusToSubscription implements Subscription, ReactiveState.Trace, ReactiveState.Upstream {
        final Registration<?, ? extends BiConsumer<?, ? extends T>> registration;

        public BusToSubscription(Subscriber<? super T> subscriber) {
            this.registration = BusStream.this.observable.on(BusStream.this.selector, new BusToSubscriber(subscriber));
        }

        public void request(long j) {
        }

        public Object upstream() {
            return BusStream.this.observable;
        }

        public void cancel() {
            this.registration.m4cancel();
        }
    }

    public BusStream(@Nonnull Bus<?, T> bus, @Nonnull Selector selector) {
        this.selector = selector;
        this.observable = bus;
        if (EventBus.class.isAssignableFrom(bus.getClass())) {
            this.ordering = 1 == ((EventBus) bus).getConcurrency();
        } else {
            this.ordering = true;
        }
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        Subscriber<? super T> create = !this.ordering ? SerializedSubscriber.create(subscriber) : subscriber;
        create.onSubscribe(new BusToSubscription(create));
    }

    public String toString() {
        return "BusStream{selector=" + this.selector + ", bus=" + this.observable + '}';
    }
}
