/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.examples.tcp.interval;

import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.server.RxServer;
import java.util.concurrent.TimeUnit;
import rx.Notification;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;

public final class TcpIntervalServer {
    static final int DEFAULT_PORT = 8101;
    private final int port;
    private final int interval;

    public TcpIntervalServer(int port, int interval) {
        this.port = port;
        this.interval = interval;
    }

    public RxServer<String, String> createServer() {
        RxServer server = RxNetty.createTcpServer((int)this.port, (PipelineConfigurator)PipelineConfigurators.textOnlyConfigurator(), (ConnectionHandler)new ConnectionHandler<String, String>(){

            public Observable<Void> handle(final ObservableConnection<String, String> connection) {
                System.out.println("--- Connection Started ---");
                final Observable input = connection.getInput().map((Func1)new Func1<String, String>(){

                    public String call(String s) {
                        return s.trim();
                    }
                });
                return input.flatMap((Func1)new Func1<String, Observable<Void>>(){

                    public Observable<Void> call(String msg) {
                        if (msg.startsWith("subscribe:")) {
                            System.out.println("-------------------------------------");
                            System.out.println("Received 'subscribe' from client so starting interval ...");
                            return TcpIntervalServer.this.getIntervalObservable((ObservableConnection<String, String>)connection).takeUntil(input.filter((Func1)new Func1<String, Boolean>(){

                                public Boolean call(String s) {
                                    return "unsubscribe:".equals(s);
                                }
                            }));
                        }
                        if (msg.startsWith("unsubscribe:")) {
                            System.out.println("Received 'unsubscribe' from client so stopping interval (or ignoring if nothing subscribed) ...");
                            return Observable.empty();
                        }
                        if (!msg.isEmpty() && !"unsubscribe:".equals(msg)) {
                            connection.writeAndFlush((Object)("\nERROR => Unknown command: " + msg + "\nCommands => subscribe:, unsubscribe:\n"));
                        }
                        return Observable.empty();
                    }
                }).finallyDo(new Action0(){

                    public void call() {
                        System.out.println("--- Connection Closed ---");
                    }
                });
            }
        });
        System.out.println("TCP interval server started...");
        return server;
    }

    private Observable<Void> getIntervalObservable(final ObservableConnection<String, String> connection) {
        return Observable.interval((long)this.interval, (TimeUnit)TimeUnit.MILLISECONDS).flatMap((Func1)new Func1<Long, Observable<Notification<Void>>>(){

            public Observable<Notification<Void>> call(Long interval) {
                System.out.println("Writing interval: " + interval);
                return connection.writeAndFlush((Object)("interval => " + interval + '\n')).materialize();
            }
        }).takeWhile((Func1)new Func1<Notification<Void>, Boolean>(){

            public Boolean call(Notification<Void> notification) {
                return !notification.isOnError();
            }
        }).map((Func1)new Func1<Notification<Void>, Void>(){

            public Void call(Notification<Void> notification) {
                return null;
            }
        });
    }

    public static void main(String[] args) {
        int interval = 1000;
        if (args.length > 0) {
            interval = Integer.valueOf(args[0]);
        }
        new TcpIntervalServer(8101, interval).createServer().startAndWait();
    }
}

