/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.examples.http.logtail;

import io.netty.buffer.ByteBuf;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerBuilder;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

public class LogAggregator {
    static final int DEFAULT_AG_PORT = 8091;
    private final int port;
    private final List<Integer> producerPorts;
    HttpServer<ByteBuf, ServerSentEvent> server;

    public LogAggregator(int port, List<Integer> producerPorts) {
        this.port = port;
        this.producerPorts = producerPorts;
    }

    public LogAggregator(int port, int producerPortFrom, int producerPortTo) {
        this.port = port;
        int producerCount = producerPortTo - producerPortFrom + 1;
        this.producerPorts = new ArrayList<Integer>(producerCount);
        for (int i = 0; i < producerCount; ++i) {
            this.producerPorts.add(producerPortFrom + i);
        }
    }

    public HttpServer<ByteBuf, ServerSentEvent> createAggregationServer() {
        this.server = ((HttpServerBuilder)((HttpServerBuilder)RxNetty.newHttpServerBuilder((int)this.port, (RequestHandler)new RequestHandler<ByteBuf, ServerSentEvent>(){

            public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ServerSentEvent> response) {
                return LogAggregator.this.connectToLogProducers().flatMap((Func1)new Func1<ServerSentEvent, Observable<Void>>(){

                    public Observable<Void> call(ServerSentEvent sse) {
                        return response.writeAndFlush((Object)sse);
                    }
                });
            }
        }).enableWireLogging(LogLevel.ERROR)).pipelineConfigurator(PipelineConfigurators.serveSseConfigurator())).build();
        System.out.println("Logs aggregator server started...");
        return this.server;
    }

    private Observable<ServerSentEvent> connectToLogProducers() {
        ArrayList<Observable<ServerSentEvent>> oList = new ArrayList<Observable<ServerSentEvent>>(this.producerPorts.size());
        for (int producerPort : this.producerPorts) {
            oList.add(LogAggregator.connectToLogProducer(producerPort));
        }
        return Observable.merge(oList);
    }

    private static Observable<ServerSentEvent> connectToLogProducer(int port) {
        HttpClient client = RxNetty.createHttpClient((String)"localhost", (int)port, (PipelineConfigurator)PipelineConfigurators.clientSseConfigurator());
        return client.submit(HttpClientRequest.createGet((String)"/logstream")).flatMap((Func1)new Func1<HttpClientResponse<ServerSentEvent>, Observable<ServerSentEvent>>(){

            public Observable<ServerSentEvent> call(HttpClientResponse<ServerSentEvent> response) {
                return response.getContent().doOnNext((Action1)new Action1<ServerSentEvent>(){

                    public void call(ServerSentEvent serverSentEvent) {
                        serverSentEvent.retain();
                    }
                });
            }
        });
    }

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("ERROR: provide log producers port range");
            return;
        }
        int producerPortFrom = Integer.valueOf(args[0]);
        int producerPortTo = Integer.valueOf(args[1]);
        LogAggregator aggregator = new LogAggregator(8091, producerPortFrom, producerPortTo);
        aggregator.createAggregationServer().startAndWait();
        System.out.println("Aggregator service terminated");
    }
}

