/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.transport.rsocket.client;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.transport.client.api.ClientChannel;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RSocketServiceClientAdapter
implements ClientChannel {
    private Mono<RSocket> rSocket;
    private ServiceMessageCodec messageCodec;

    public RSocketServiceClientAdapter(Mono<RSocket> rSocket, ServiceMessageCodec codec) {
        this.rSocket = rSocket;
        this.messageCodec = codec;
    }

    @Override
    public Mono<ServiceMessage> requestResponse(ServiceMessage message) {
        return this.rSocket.flatMap(rSocket -> rSocket.requestResponse(this.toPayload(message)).takeUntilOther(this.listenConnectionClose((RSocket)rSocket))).map(this::toMessage);
    }

    @Override
    public Flux<ServiceMessage> requestStream(ServiceMessage message) {
        return this.rSocket.flatMapMany(rSocket -> rSocket.requestStream(this.toPayload(message)).takeUntilOther(this.listenConnectionClose((RSocket)rSocket))).map(this::toMessage);
    }

    @Override
    public Flux<ServiceMessage> requestChannel(Publisher<ServiceMessage> publisher) {
        return this.rSocket.flatMapMany(rSocket -> rSocket.requestChannel(Flux.from(publisher).map(this::toPayload)).takeUntilOther(this.listenConnectionClose((RSocket)rSocket))).map(this::toMessage);
    }

    private Payload toPayload(ServiceMessage request) {
        return this.messageCodec.encodeAndTransform(request, ByteBufPayload::create);
    }

    private ServiceMessage toMessage(Payload payload) {
        return this.messageCodec.decode(payload.sliceData(), payload.sliceMetadata());
    }

    private <T> Mono<T> listenConnectionClose(RSocket rSocket) {
        return rSocket.onClose().map(aVoid -> aVoid).switchIfEmpty(Mono.defer(this::toConnectionClosedException));
    }

    private <T> Mono<T> toConnectionClosedException() {
        return Mono.error(new ConnectionClosedException("Connection closed"));
    }
}

