/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.transport.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.transport.api.ClientChannel;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RSocketServiceClientAdapter
implements ClientChannel {
    private Mono<RSocket> rsocket;
    private ServiceMessageCodec messageCodec;

    public RSocketServiceClientAdapter(Mono<RSocket> rsocket, ServiceMessageCodec codec) {
        this.rsocket = rsocket;
        this.messageCodec = codec;
    }

    @Override
    public Mono<ServiceMessage> requestResponse(ServiceMessage message) {
        return this.rsocket.flatMap(rsocket -> rsocket.requestResponse(this.toPayload(message)).takeUntilOther(this.listenConnectionClose((RSocket)rsocket))).map(this::toMessage);
    }

    @Override
    public Flux<ServiceMessage> requestStream(ServiceMessage message) {
        return this.rsocket.flatMapMany(rsocket -> rsocket.requestStream(this.toPayload(message)).takeUntilOther(this.listenConnectionClose((RSocket)rsocket))).map(this::toMessage);
    }

    @Override
    public Flux<ServiceMessage> requestChannel(Publisher<ServiceMessage> publisher) {
        return this.rsocket.flatMapMany(rsocket -> rsocket.requestChannel(Flux.from(publisher).map(this::toPayload)).takeUntilOther(this.listenConnectionClose((RSocket)rsocket))).map(this::toMessage);
    }

    private Payload toPayload(ServiceMessage request) {
        return this.messageCodec.encodeAndTransform(request, ByteBufPayload::create);
    }

    private ServiceMessage toMessage(Payload payload) {
        return this.messageCodec.decode(payload.sliceData(), payload.sliceMetadata());
    }

    private <T> Mono<T> listenConnectionClose(RSocket rsocket) {
        return rsocket.onClose().map(empty -> empty).switchIfEmpty(Mono.defer(this::toConnectionClosedException));
    }

    private <T> Mono<T> toConnectionClosedException() {
        return Mono.error(new ConnectionClosedException("Connection closed"));
    }
}

