/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.transport.tcp;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivex.netty.channel.Connection;
import org.reactivestreams.Publisher;
import rx.Observable;
import rx.RxReactiveStreams;

public class TcpDuplexConnection
implements DuplexConnection {
    private final Connection<Frame, Frame> connection;
    private final Publisher<Void> closeNotifier;
    private final Publisher<Void> close;

    public TcpDuplexConnection(Connection<Frame, Frame> connection) {
        this.connection = connection;
        this.closeNotifier = RxReactiveStreams.toPublisher((Observable)connection.closeListener());
        this.close = RxReactiveStreams.toPublisher((Observable)connection.close());
    }

    public Publisher<Void> send(Publisher<Frame> frames) {
        return RxReactiveStreams.toPublisher((Observable)this.connection.writeAndFlushOnEach(RxReactiveStreams.toObservable(frames)));
    }

    public Publisher<Frame> receive() {
        return RxReactiveStreams.toPublisher((Observable)this.connection.getInput());
    }

    public double availability() {
        return this.connection.unsafeNettyChannel().isActive() ? 1.0 : 0.0;
    }

    public Publisher<Void> close() {
        return this.close;
    }

    public Publisher<Void> onClose() {
        return this.closeNotifier;
    }

    public String toString() {
        return this.connection.unsafeNettyChannel().toString();
    }
}

