/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.examples.http.logtail;

import io.netty.buffer.ByteBuf;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.examples.http.logtail.LogEvent;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerBuilder;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Func1;

public class LogProducer {
    private final int port;
    private final long interval;
    private final String source;

    public LogProducer(int port, int interval) {
        this.port = port;
        this.interval = interval;
        this.source = "localhost:" + port;
    }

    public HttpServer<ByteBuf, ServerSentEvent> createServer() {
        HttpServer server = ((HttpServerBuilder)((HttpServerBuilder)RxNetty.newHttpServerBuilder((int)this.port, (RequestHandler)new RequestHandler<ByteBuf, ServerSentEvent>(){

            public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ServerSentEvent> response) {
                return LogProducer.this.createReplyHandlerObservable((HttpServerResponse<ServerSentEvent>)response);
            }
        }).pipelineConfigurator(PipelineConfigurators.serveSseConfigurator())).enableWireLogging(LogLevel.DEBUG)).build();
        System.out.println("Started log producer on port " + this.port);
        return server;
    }

    private Observable<Void> createReplyHandlerObservable(final HttpServerResponse<ServerSentEvent> response) {
        return Observable.interval((long)this.interval, (TimeUnit)TimeUnit.MILLISECONDS).flatMap((Func1)new Func1<Long, Observable<Void>>(){

            public Observable<Void> call(Long interval) {
                ByteBuf eventId = response.getAllocator().buffer().writeLong(interval.longValue());
                ByteBuf data = response.getAllocator().buffer().writeBytes(LogEvent.randomLogEvent(LogProducer.this.source).toCSV().getBytes());
                return response.writeAndFlush((Object)ServerSentEvent.withEventId((ByteBuf)eventId, (ByteBuf)data));
            }
        });
    }

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("ERROR: specify log producer's port number and a message sending interval");
            return;
        }
        int port = Integer.valueOf(args[0]);
        int interval = Integer.valueOf(args[1]);
        new LogProducer(port, interval).createServer().startAndWait();
        System.out.println("LogProducer service terminated");
    }
}

