package reactor.io.netty.nexus;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.util.AsciiString;
import java.io.UnsupportedEncodingException;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import reactor.core.flow.Loopback;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.TopicProcessor;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.core.scheduler.TimedScheduler;
import reactor.core.state.Introspectable;
import reactor.core.subscriber.SubmissionEmitter;
import reactor.core.util.Exceptions;
import reactor.core.util.Logger;
import reactor.core.util.PlatformDependent;
import reactor.core.util.WaitStrategy;
import reactor.io.ipc.Channel;
import reactor.io.ipc.ChannelHandler;
import reactor.io.netty.common.Peer;
import reactor.io.netty.http.HttpChannel;
import reactor.io.netty.http.HttpClient;
import reactor.io.netty.http.HttpServer;
import reactor.io.util.FlowSerializerUtils;

/* loaded from: input_file:reactor/io/netty/nexus/Nexus.class */
public final class Nexus extends Peer<ByteBuf, ByteBuf, Channel<ByteBuf, ByteBuf>> implements ChannelHandler<ByteBuf, ByteBuf, HttpChannel>, Loopback {
    final HttpServer server;
    final GraphEvent lastState;
    final SystemEvent lastSystemState;
    final FluxProcessor<Event, Event> eventStream;
    final Scheduler group;
    final Function<Event, Event> lastStateMerge;
    final TimedScheduler timer;
    final SubmissionEmitter<Publisher<Event>> cannons;
    volatile FederatedClient[] federatedClients;
    long systemStatsPeriod;
    boolean systemStats;
    boolean logExtensionEnabled;
    NexusLoggerExtension logExtension;
    long websocketCapacity;
    static final String API_STREAM_URL = "/nexus/stream";
    static final AsciiString ALL = new AsciiString("*");
    static final AtomicReferenceFieldUpdater<Nexus, FederatedClient[]> FEDERATED = PlatformDependent.newAtomicReferenceFieldUpdater(Nexus.class, "federatedClients");
    static final Logger log = Logger.getLogger(Nexus.class);
    static final Function<Event, ByteBuf> BUFFER_STRING_FUNCTION = new StringToBuffer();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$Event.class */
    public static class Event {
        final String nexusHost;

        public Event(String str) {
            this.nexusHost = str;
        }

        public String getNexusHost() {
            return this.nexusHost;
        }

        public String getType() {
            return getClass().getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$FederatedClient.class */
    public static final class FederatedClient {
        final HttpClient client = HttpClient.create();
        final String targetAPI;

        public FederatedClient(String str) {
            this.targetAPI = str;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$FederatedMerger.class */
    public static final class FederatedMerger implements Function<FederatedClient, Publisher<ByteBuf>> {
        final HttpChannel c;

        public FederatedMerger(HttpChannel httpChannel) {
            this.c = httpChannel;
        }

        @Override // java.util.function.Function
        public Publisher<ByteBuf> apply(FederatedClient federatedClient) {
            return federatedClient.client.ws(federatedClient.targetAPI).flatMap((v0) -> {
                return v0.mo14receive();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$GraphEvent.class */
    public static final class GraphEvent extends Event {
        final FlowSerializerUtils.Graph graph;

        public GraphEvent(String str, FlowSerializerUtils.Graph graph) {
            super(str);
            this.graph = graph;
        }

        public FlowSerializerUtils.Graph getStreams() {
            return this.graph;
        }

        public String toString() {
            return "{ " + FlowSerializerUtils.property("streams", getStreams()) + ", " + FlowSerializerUtils.property("type", getType()) + ", " + FlowSerializerUtils.property("timestamp", Long.valueOf(System.currentTimeMillis())) + ", " + FlowSerializerUtils.property("nexusHost", getNexusHost()) + " }";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$GraphMapper.class */
    public final class GraphMapper implements Function<Object, Event> {
        GraphMapper() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Function
        public Event apply(Object obj) {
            return new GraphEvent(Nexus.this.server.getListenAddress().toString(), FlowSerializerUtils.Graph.class.equals(obj.getClass()) ? (FlowSerializerUtils.Graph) obj : FlowSerializerUtils.scan(obj));
        }
    }

    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$LastGraphStateMap.class */
    class LastGraphStateMap implements Function<Event, Event>, Introspectable {
        LastGraphStateMap() {
        }

        @Override // java.util.function.Function
        public Event apply(Event event) {
            if (!GraphEvent.class.equals(event.getClass())) {
                return event;
            }
            Nexus.this.lastState.graph.mergeWith(((GraphEvent) event).graph);
            return Nexus.this.lastState;
        }

        public int getMode() {
            return 0;
        }

        public String getName() {
            return "ScanIfGraphEvent";
        }
    }

    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$LogEvent.class */
    static final class LogEvent extends Event {
        final String message;
        final String category;
        final Level level;
        final long threadId;
        final String origin;
        final String data;
        final String kind;
        final long timestamp;

        public LogEvent(String str, String str2, Level level, String str3, Object... objArr) {
            super(str);
            this.timestamp = System.currentTimeMillis();
            this.threadId = Thread.currentThread().getId();
            this.message = str3;
            this.level = level;
            this.category = str2;
            if (objArr == null || objArr.length != 3) {
                this.origin = null;
                this.kind = null;
                this.data = null;
            } else {
                this.kind = objArr[0].toString();
                this.data = objArr[1] != null ? objArr[1].toString() : null;
                this.origin = FlowSerializerUtils.getIdOrDefault(objArr[2]);
            }
        }

        public String getCategory() {
            return this.category;
        }

        public String getData() {
            return this.data;
        }

        public String getKind() {
            return this.kind;
        }

        public Level getLevel() {
            return this.level;
        }

        public String getMessage() {
            return this.message;
        }

        public String getOrigin() {
            return this.origin;
        }

        public long getThreadId() {
            return this.threadId;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public String toString() {
            return "{ " + FlowSerializerUtils.property("timestamp", Long.valueOf(getTimestamp())) + ", " + FlowSerializerUtils.property("level", getLevel().getName()) + ", " + FlowSerializerUtils.property("category", getCategory()) + (this.kind != null ? ", " + FlowSerializerUtils.property("kind", getKind()) : "") + (this.origin != null ? ", " + FlowSerializerUtils.property("origin", getOrigin()) : "") + (this.data != null ? ", " + FlowSerializerUtils.property("data", getData()) : "") + ", " + FlowSerializerUtils.property("message", getMessage()) + ", " + FlowSerializerUtils.property("threadId", Long.valueOf(getThreadId())) + ", " + FlowSerializerUtils.property("type", getType()) + ", " + FlowSerializerUtils.property("nexusHost", getNexusHost()) + " }";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$MetricEvent.class */
    public static final class MetricEvent extends Event {
        public MetricEvent(String str) {
            super(str);
        }

        public String toString() {
            return "{ " + FlowSerializerUtils.property("nexusHost", getNexusHost()) + ", " + FlowSerializerUtils.property("type", getType()) + ", " + FlowSerializerUtils.property("timestamp", Long.valueOf(System.currentTimeMillis())) + " }";
        }
    }

    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$MetricMapper.class */
    final class MetricMapper implements Function<Object, Event> {
        MetricMapper() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Function
        public Event apply(Object obj) {
            return new MetricEvent(Nexus.this.server.getListenAddress().toString());
        }
    }

    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$NexusLoggerExtension.class */
    static final class NexusLoggerExtension implements Logger.Extension {
        final SubmissionEmitter<Event> logSink;
        final String hostname;

        public NexusLoggerExtension(String str, SubmissionEmitter<Event> submissionEmitter) {
            this.logSink = submissionEmitter;
            this.hostname = str;
        }

        public void log(String str, Level level, String str2, Object... objArr) {
            String format = Logger.format(str2, objArr);
            if (objArr != null && objArr.length == 3 && FlowSerializerUtils.isLogging(objArr[2])) {
                if (!this.logSink.emit(new LogEvent(this.hostname, str, level, format, objArr)).isOk()) {
                }
            } else {
                if (!this.logSink.emit(new LogEvent(this.hostname, str, level, format, new Object[0])).isOk()) {
                }
            }
        }
    }

    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$RemovedGraphEvent.class */
    static final class RemovedGraphEvent extends Event {
        final Collection<String> ids;

        public RemovedGraphEvent(String str, Collection<String> collection) {
            super(str);
            this.ids = collection;
        }

        public Collection<String> getStreams() {
            return this.ids;
        }

        public String toString() {
            return "{ " + FlowSerializerUtils.property("streams", getStreams()) + ", " + FlowSerializerUtils.property("type", getType()) + ", " + FlowSerializerUtils.property("timestamp", Long.valueOf(System.currentTimeMillis())) + ", " + FlowSerializerUtils.property("nexusHost", getNexusHost()) + " }";
        }
    }

    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$StringToBuffer.class */
    static class StringToBuffer implements Function<Event, ByteBuf> {
        StringToBuffer() {
        }

        @Override // java.util.function.Function
        public ByteBuf apply(Event event) {
            try {
                return Unpooled.wrappedBuffer(event.toString().getBytes("UTF-8"));
            } catch (UnsupportedEncodingException e) {
                throw Exceptions.propagate(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/io/netty/nexus/Nexus$SystemEvent.class */
    public static final class SystemEvent extends Event {
        final Map<Thread, ThreadState> threads;
        static final Runtime runtime = Runtime.getRuntime();
        static final JvmStats jvmStats = new JvmStats();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:reactor/io/netty/nexus/Nexus$SystemEvent$JvmStats.class */
        public static final class JvmStats {
            JvmStats() {
            }

            public int getActiveThreads() {
                return Thread.activeCount();
            }

            public int getAvailableProcessors() {
                return SystemEvent.runtime.availableProcessors();
            }

            public long getFreeMemory() {
                return SystemEvent.runtime.freeMemory();
            }

            public long getMaxMemory() {
                return SystemEvent.runtime.maxMemory();
            }

            public long getUsedMemory() {
                return SystemEvent.runtime.totalMemory();
            }

            public String toString() {
                return "{ " + FlowSerializerUtils.property("freeMemory", Long.valueOf(getFreeMemory())) + ", " + FlowSerializerUtils.property("maxMemory", Long.valueOf(getMaxMemory())) + ", " + FlowSerializerUtils.property("usedMemory", Long.valueOf(getUsedMemory())) + ", " + FlowSerializerUtils.property("activeThreads", Integer.valueOf(getActiveThreads())) + ", " + FlowSerializerUtils.property("availableProcessors", Integer.valueOf(getAvailableProcessors())) + " }";
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:reactor/io/netty/nexus/Nexus$SystemEvent$ThreadState.class */
        public static final class ThreadState {
            final transient Thread thread;

            public ThreadState(Thread thread) {
                this.thread = thread;
            }

            public long getContextHash() {
                if (this.thread.getContextClassLoader() != null) {
                    return this.thread.getContextClassLoader().hashCode();
                }
                return -1L;
            }

            public long getId() {
                return this.thread.getId();
            }

            public String getName() {
                return this.thread.getName();
            }

            public int getPriority() {
                return this.thread.getPriority();
            }

            public Thread.State getState() {
                return this.thread.getState();
            }

            public String getThreadGroup() {
                if (this.thread.getThreadGroup() != null) {
                    return this.thread.getThreadGroup().getName();
                }
                return null;
            }

            public boolean isAlive() {
                return this.thread.isAlive();
            }

            public boolean isDaemon() {
                return this.thread.isDaemon();
            }

            public boolean isInterrupted() {
                return this.thread.isInterrupted();
            }

            public String toString() {
                return "{ " + FlowSerializerUtils.property("id", Long.valueOf(getId())) + ", " + FlowSerializerUtils.property("name", getName()) + ", " + FlowSerializerUtils.property("alive", Boolean.valueOf(isAlive())) + ", " + FlowSerializerUtils.property("state", getState().name()) + (getThreadGroup() != null ? ", " + FlowSerializerUtils.property("threadGroup", getThreadGroup()) : "") + (getContextHash() != -1 ? ", " + FlowSerializerUtils.property("contextHash", Long.valueOf(getContextHash())) : "") + ", " + FlowSerializerUtils.property("interrupted", Boolean.valueOf(isInterrupted())) + ", " + FlowSerializerUtils.property("priority", Integer.valueOf(getPriority())) + ", " + FlowSerializerUtils.property("daemon", Boolean.valueOf(isDaemon())) + " }";
            }
        }

        public SystemEvent(String str) {
            super(str);
            this.threads = new WeakHashMap();
        }

        public JvmStats getJvmStats() {
            return jvmStats;
        }

        public Collection<ThreadState> getThreads() {
            return this.threads.values();
        }

        public String toString() {
            return "{ " + FlowSerializerUtils.property("jvmStats", getJvmStats()) + ", " + FlowSerializerUtils.property("threads", getThreads()) + ", " + FlowSerializerUtils.property("type", getType()) + ", " + FlowSerializerUtils.property("timestamp", Long.valueOf(System.currentTimeMillis())) + ", " + FlowSerializerUtils.property("nexusHost", getNexusHost()) + " }";
        }

        SystemEvent scan() {
            Thread[] threadArr = new Thread[Thread.activeCount()];
            int enumerate = Thread.enumerate(threadArr);
            for (int i = 0; i < enumerate; i++) {
                if (!this.threads.containsKey(threadArr[i])) {
                    this.threads.put(threadArr[i], new ThreadState(threadArr[i]));
                }
            }
            return this;
        }
    }

    public static Nexus create() {
        return create(Peer.DEFAULT_BIND_ADDRESS);
    }

    public static Nexus create(int i) {
        return create(Peer.DEFAULT_BIND_ADDRESS, i);
    }

    public static Nexus create(String str) {
        return create(str, DEFAULT_PORT);
    }

    public static Nexus create(String str, int i) {
        return create(HttpServer.create(str, i));
    }

    public static Nexus create(HttpServer httpServer) {
        Nexus nexus = new Nexus(httpServer.getDefaultTimer(), httpServer);
        log.info("Warping Nexus...");
        httpServer.get(API_STREAM_URL, nexus);
        return nexus;
    }

    public static void main(String... strArr) throws Exception {
        log.info("Deploying Nexus... ");
        Nexus create = create();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        create.startAndAwait();
        log.info("CTRL-C to return...");
        countDownLatch.await();
    }

    Nexus(TimedScheduler timedScheduler, HttpServer httpServer) {
        super(timedScheduler);
        this.websocketCapacity = 1L;
        this.server = httpServer;
        this.eventStream = EmitterProcessor.create(false);
        this.lastStateMerge = new LastGraphStateMap();
        this.timer = Schedulers.newTimer("nexus-poller");
        this.group = Schedulers.newParallel("nexus", 4);
        EmitterProcessor create = EmitterProcessor.create();
        Flux.merge(create).subscribe(this.eventStream);
        this.cannons = create.connectEmitter();
        this.lastState = new GraphEvent(httpServer.getListenAddress().toString(), FlowSerializerUtils.createGraph());
        this.lastSystemState = new SystemEvent(httpServer.getListenAddress().toString());
    }

    public Publisher<Void> apply(HttpChannel httpChannel) {
        httpChannel.responseHeader(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, ALL);
        Flux<Event> map = this.eventStream.publishOn(this.group).map(this.lastStateMerge);
        Mono then = httpChannel.isWebsocket() ? httpChannel.upgradeToTextWebsocket().then(httpChannel.send(federateAndEncode(httpChannel, map))) : httpChannel.send(federateAndEncode(httpChannel, map));
        httpChannel.receiveString().subscribe(str -> {
            int indexOf = str.indexOf("\n");
            if (indexOf > 0) {
                log.info("Received [" + str.substring(0, indexOf) + "] [" + (str.length() > indexOf ? str.substring(indexOf + 1) : null) + ']');
            }
        });
        return then;
    }

    public Object connectedInput() {
        return this.eventStream;
    }

    public Object connectedOutput() {
        return this.server;
    }

    public final Nexus disableLogTail() {
        this.logExtensionEnabled = false;
        return this;
    }

    public final Nexus federate(String... strArr) {
        FederatedClient[] federatedClientArr;
        FederatedClient[] federatedClientArr2;
        if (strArr == null || strArr.length == 0) {
            return this;
        }
        do {
            federatedClientArr = this.federatedClients;
            int length = federatedClientArr != null ? federatedClientArr.length : 0;
            federatedClientArr2 = new FederatedClient[length + strArr.length];
            if (length > 0) {
                System.arraycopy(federatedClientArr, 0, federatedClientArr2, 0, length);
            }
            for (int i = length; i < federatedClientArr2.length; i++) {
                federatedClientArr2[i] = new FederatedClient(strArr[i - length]);
            }
        } while (!FEDERATED.compareAndSet(this, federatedClientArr, federatedClientArr2));
        return this;
    }

    public HttpServer getServer() {
        return this.server;
    }

    public final SubmissionEmitter<Object> metricCannon() {
        UnicastProcessor create = UnicastProcessor.create();
        this.cannons.submit(create.map(new MetricMapper()));
        return create.connectEmitter();
    }

    public final <E> E monitor(E e) {
        return (E) monitor(e, -1L);
    }

    public final <E> E monitor(E e, long j) {
        return (E) monitor(e, j, null);
    }

    public final <E> E monitor(E e, long j, TimeUnit timeUnit) {
        long j2 = j > 0 ? j : 400L;
        UnicastProcessor create = UnicastProcessor.create();
        log.info("State Monitoring Starting on " + FlowSerializerUtils.getName(e));
        this.timer.schedulePeriodically(() -> {
            if (create.isCancelled()) {
                log.info("State Monitoring stopping on " + FlowSerializerUtils.getName(e));
                throw Exceptions.failWithCancel();
            }
            create.onNext(FlowSerializerUtils.scan(e));
        }, 0L, j2, timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS);
        this.cannons.submit(create.map(new GraphMapper()));
        return e;
    }

    public final Mono<Void> start() throws InterruptedException {
        return start(null);
    }

    public final void startAndAwait() throws InterruptedException {
        start().block();
        InetSocketAddress listenAddress = this.server.getListenAddress();
        log.info("Nexus Warped. Transmitting signal to troops under http://" + listenAddress.getHostName() + ":" + listenAddress.getPort() + API_STREAM_URL);
    }

    public final SubmissionEmitter<Object> streamCannon() {
        UnicastProcessor create = UnicastProcessor.create();
        this.cannons.submit(create.map(new GraphMapper()));
        return create.connectEmitter();
    }

    public final Nexus useCapacity(long j) {
        this.websocketCapacity = j;
        return this;
    }

    public final Nexus withLogTail() {
        this.logExtensionEnabled = true;
        return this;
    }

    public final Nexus withSystemStats() {
        return withSystemStats(true, 1L);
    }

    public final Nexus withSystemStats(boolean z, long j) {
        return withSystemStats(z, j, TimeUnit.SECONDS);
    }

    public final Nexus withSystemStats(boolean z, long j, TimeUnit timeUnit) {
        this.systemStatsPeriod = (timeUnit == null || j < 1) ? 1000L : TimeUnit.MILLISECONDS.convert(j, timeUnit);
        this.systemStats = z;
        return this;
    }

    @Override // reactor.io.netty.common.Peer
    protected Mono<Void> doStart(ChannelHandler<ByteBuf, ByteBuf, Channel<ByteBuf, ByteBuf>> channelHandler) {
        if (this.logExtensionEnabled) {
            TopicProcessor share = TopicProcessor.share("create-log-sink", 256, WaitStrategy.blocking());
            this.cannons.submit(share);
            this.logExtension = new NexusLoggerExtension(this.server.getListenAddress().toString(), share.connectEmitter());
            if (!Logger.enableExtension(this.logExtension)) {
                log.warn("Couldn't setup logger extension as one is already in place");
                this.logExtension = null;
            }
        }
        if (this.systemStats) {
            UnicastProcessor create = UnicastProcessor.create();
            this.cannons.submit(create);
            log.info("System Monitoring Starting");
            this.timer.schedulePeriodically(() -> {
                if (create.isCancelled()) {
                    log.info("System Monitoring Stopped");
                    throw Exceptions.failWithCancel();
                }
                create.onNext(this.lastSystemState.scan());
            }, 0L, this.systemStatsPeriod, TimeUnit.MILLISECONDS);
        }
        return this.server.start();
    }

    @Override // reactor.io.netty.common.Peer
    protected Mono<Void> doShutdown() {
        this.timer.shutdown();
        this.cannons.finish();
        this.eventStream.onComplete();
        if (this.logExtension != null) {
            Logger.disableExtension(this.logExtension);
            this.logExtension.logSink.complete();
            this.logExtension = null;
        }
        return this.server.shutdown();
    }

    Flux<? extends ByteBuf> federateAndEncode(HttpChannel httpChannel, Flux<Event> flux) {
        FederatedClient[] federatedClientArr = this.federatedClients;
        return (federatedClientArr == null || federatedClientArr.length == 0) ? flux.map(BUFFER_STRING_FUNCTION).useCapacity(this.websocketCapacity) : Flux.merge(new Publisher[]{flux.map(BUFFER_STRING_FUNCTION), Flux.merge(Flux.fromArray(federatedClientArr).map(new FederatedMerger(httpChannel)))}).useCapacity(this.websocketCapacity);
    }
}
