/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.examples.http.sse;

import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import java.util.concurrent.TimeUnit;
import rx.Notification;
import rx.Observable;
import rx.functions.Func1;

public final class HttpSseServer {
    static final int DEFAULT_PORT = 8096;
    static final int DEFAULT_INTERVAL = 1000;
    private final int port;
    private final int interval;

    public HttpSseServer(int port, int interval) {
        this.port = port;
        this.interval = interval;
    }

    public HttpServer<ByteBuf, ServerSentEvent> createServer() {
        HttpServer server = RxNetty.createHttpServer((int)this.port, (RequestHandler)new RequestHandler<ByteBuf, ServerSentEvent>(){

            public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ServerSentEvent> response) {
                return HttpSseServer.this.getIntervalObservable((HttpServerResponse<ServerSentEvent>)response);
            }
        }, (PipelineConfigurator)PipelineConfigurators.serveSseConfigurator());
        System.out.println("HTTP Server Sent Events server started...");
        return server;
    }

    private Observable<Void> getIntervalObservable(final HttpServerResponse<ServerSentEvent> response) {
        return Observable.interval((long)this.interval, (TimeUnit)TimeUnit.MILLISECONDS).flatMap((Func1)new Func1<Long, Observable<Void>>(){

            public Observable<Void> call(Long interval) {
                System.out.println("Writing SSE event for interval: " + interval);
                ByteBuf data = response.getAllocator().buffer().writeBytes(("hello " + interval).getBytes());
                ServerSentEvent event = new ServerSentEvent(data);
                return response.writeAndFlush((Object)event);
            }
        }).materialize().takeWhile((Func1)new Func1<Notification<Void>, Boolean>(){

            public Boolean call(Notification<Void> notification) {
                if (notification.isOnError()) {
                    System.out.println("Write to client failed, stopping response sending.");
                    notification.getThrowable().printStackTrace(System.err);
                }
                return !notification.isOnError();
            }
        }).map((Func1)new Func1<Notification<Void>, Void>(){

            public Void call(Notification<Void> notification) {
                return null;
            }
        });
    }

    public static void main(String[] args) {
        new HttpSseServer(8096, 1000).createServer().startAndWait();
    }
}

