/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.examples.tcp.event;

import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.server.RxServer;
import java.util.concurrent.TimeUnit;
import rx.Notification;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;

public final class TcpEventStreamServer {
    static final int DEFAULT_PORT = 8100;
    private final int port;

    public TcpEventStreamServer(int port) {
        this.port = port;
    }

    public RxServer<String, String> createServer() {
        RxServer server = RxNetty.createTcpServer((int)this.port, (PipelineConfigurator)PipelineConfigurators.textOnlyConfigurator(), (ConnectionHandler)new ConnectionHandler<String, String>(){

            public Observable<Void> handle(ObservableConnection<String, String> newConnection) {
                return TcpEventStreamServer.startEventStream((ObservableConnection<String, String>)newConnection);
            }
        });
        System.out.println("TCP event stream server started...");
        return server;
    }

    private static Observable<Void> startEventStream(final ObservableConnection<String, String> connection) {
        return Observable.interval((long)10L, (TimeUnit)TimeUnit.MILLISECONDS).flatMap((Func1)new Func1<Long, Observable<Notification<Void>>>(){

            public Observable<Notification<Void>> call(Long interval) {
                System.out.println("Writing event: " + interval);
                return connection.writeAndFlush((Object)("data: {\"type\":\"Command\",\"name\":\"GetAccount\",\"currentTime\":1376957348166,\"errorPercentage\":0,\"errorCount\":0,\"requestCount\":" + interval + "}\n")).materialize();
            }
        }).takeWhile((Func1)new Func1<Notification<Void>, Boolean>(){

            public Boolean call(Notification<Void> notification) {
                return !notification.isOnError();
            }
        }).finallyDo(new Action0(){

            public void call() {
                System.out.println(" --> Closing connection and stream");
            }
        }).map((Func1)new Func1<Notification<Void>, Void>(){

            public Void call(Notification<Void> notification) {
                return null;
            }
        });
    }

    public static void main(String[] args) {
        new TcpEventStreamServer(8100).createServer().startAndWait();
    }
}

