/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.gateway.transport.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.services.api.ErrorData;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.websocket.Signal;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.jctools.maps.NonBlockingHashMapLong;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.netty.Connection;
import reactor.netty.NettyPipeline;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

final class WebsocketSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSession.class);
    private static final String STREAM_ID = "sid";
    private static final String SIGNAL = "sig";
    private final String id;
    private final GatewayClientCodec<ByteBuf> codec;
    private final Connection connection;
    private final WebsocketOutbound outbound;
    private final Map<Long, UnicastProcessor<ServiceMessage>> inboundProcessors = new NonBlockingHashMapLong(1024);

    WebsocketSession(GatewayClientCodec<ByteBuf> codec, Connection connection) {
        this.id = Integer.toHexString(System.identityHashCode(this));
        this.codec = codec;
        this.connection = connection;
        this.outbound = (WebsocketOutbound)connection.outbound().options(NettyPipeline.SendOptions::flushOnEach);
        WebsocketInbound inbound = (WebsocketInbound)connection.inbound();
        inbound.aggregateFrames().receive().retain().subscribe(byteBuf -> {
            ServiceMessage msg;
            try {
                msg = codec.decode((ByteBuf)byteBuf);
            }
            catch (Exception ex) {
                LOGGER.error("Response decoder failed: " + ex);
                return;
            }
            if (!msg.headers().containsKey(STREAM_ID)) {
                LOGGER.error("Ignore response: {} with null sid, session={}", (Object)msg, (Object)this.id);
                Optional.ofNullable(msg.data()).ifPresent(ReferenceCountUtil::safestRelease);
                return;
            }
            long sid = Long.valueOf(msg.header(STREAM_ID));
            UnicastProcessor<ServiceMessage> processor = this.inboundProcessors.get(sid);
            if (processor == null) {
                LOGGER.error("Can't find processor by sid={} for response: {}, session={}", new Object[]{sid, msg, this.id});
                Optional.ofNullable(msg.data()).ifPresent(ReferenceCountUtil::safestRelease);
                return;
            }
            this.handleResponse(msg, arg_0 -> processor.onNext(arg_0), arg_0 -> processor.onError(arg_0), () -> processor.onComplete());
        });
        connection.onDispose(() -> this.inboundProcessors.forEach((k, resp) -> resp.onError((Throwable)new ConnectionClosedException("Connection closed"))));
    }

    public String id() {
        return this.id;
    }

    public Mono<Void> send(ByteBuf byteBuf, long sid) {
        return Mono.defer(() -> {
            this.inboundProcessors.computeIfAbsent(sid, key -> UnicastProcessor.create());
            LOGGER.debug("Put sid={}, session={}", (Object)sid, (Object)this.id);
            return this.outbound.sendObject((Publisher)Mono.just((Object)byteBuf).map(TextWebSocketFrame::new)).then().doOnError(th -> {
                UnicastProcessor<ServiceMessage> processor = this.inboundProcessors.remove(sid);
                if (processor != null) {
                    processor.onError(th);
                }
            });
        });
    }

    public Mono<Void> send(Flux<ByteBuf> byteBuf, long sid) {
        return Mono.defer(() -> {
            this.inboundProcessors.computeIfAbsent(sid, key -> UnicastProcessor.create());
            LOGGER.debug("Put sid={}, session={}", (Object)sid, (Object)this.id);
            return this.outbound.sendObject((Publisher)byteBuf.map(TextWebSocketFrame::new)).then().doOnError(th -> {
                UnicastProcessor<ServiceMessage> processor = this.inboundProcessors.remove(sid);
                if (processor != null) {
                    processor.onError(th);
                }
            });
        });
    }

    public Flux<ServiceMessage> receive(long sid) {
        return Flux.defer(() -> {
            UnicastProcessor<ServiceMessage> processor = this.inboundProcessors.get(sid);
            if (processor == null) {
                LOGGER.error("Can't find processor by sid={}, session={}", (Object)sid, (Object)this.id);
                throw new IllegalStateException("Can't find processor by sid");
            }
            return processor.doOnTerminate(() -> {
                this.inboundProcessors.remove(sid);
                LOGGER.debug("Removed sid={}, session={}", (Object)sid, (Object)this.id);
            });
        });
    }

    public Mono<Void> close() {
        return this.outbound.sendClose().then();
    }

    public Mono<Void> onClose() {
        return this.connection.onDispose();
    }

    private void handleResponse(ServiceMessage response, Consumer<ServiceMessage> onNext, Consumer<Throwable> onError, Runnable onComplete) {
        LOGGER.debug("Handle response: {}, session={}", (Object)response, (Object)this.id);
        try {
            Optional<Signal> signalOptional = Optional.ofNullable(response.header(SIGNAL)).map(Signal::from);
            if (signalOptional.isPresent()) {
                Signal signal = signalOptional.get();
                if (signal == Signal.COMPLETE) {
                    onComplete.run();
                }
                if (signal == Signal.ERROR) {
                    ServiceMessage errorMessage = this.codec.decodeData(response, (Type)((Object)ErrorData.class));
                    Throwable e = DefaultErrorMapper.INSTANCE.toError(errorMessage);
                    String sid = response.header(STREAM_ID);
                    LOGGER.error("Received error response: sid={}, error={}", (Object)sid, (Object)e);
                    onError.accept(e);
                }
            } else {
                onNext.accept(response);
            }
        }
        catch (Exception e) {
            onError.accept(e);
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("gateway.client.transport.WebsocketSession{");
        sb.append("id='").append(this.id).append('\'');
        sb.append('}');
        return sb.toString();
    }
}

