/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.gateway.transport.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.services.api.ErrorData;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.websocket.Signal;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.Consumer;
import org.jctools.maps.NonBlockingHashMapLong;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;
import reactor.netty.Connection;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

public final class WebsocketSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSession.class);
    private static final String STREAM_ID = "sid";
    private static final String SIGNAL = "sig";
    private final String id;
    private final GatewayClientCodec<ByteBuf> codec;
    private final Connection connection;
    private final Map<Long, Processor<ServiceMessage, ServiceMessage>> inboundProcessors = new NonBlockingHashMapLong(1024);

    WebsocketSession(GatewayClientCodec<ByteBuf> codec, Connection connection) {
        this.id = Integer.toHexString(System.identityHashCode(this));
        this.codec = codec;
        this.connection = connection;
        WebsocketInbound inbound = (WebsocketInbound)connection.inbound();
        inbound.aggregateFrames().receiveFrames().filter(f -> !(f instanceof PongWebSocketFrame) && !(f instanceof PingWebSocketFrame)).map(f -> f.retain().content()).subscribe(byteBuf -> {
            ServiceMessage message;
            try {
                message = codec.decode((ByteBuf)byteBuf);
            }
            catch (Exception ex) {
                LOGGER.error("Response decoder failed: " + ex);
                return;
            }
            if (!message.headers().containsKey(STREAM_ID)) {
                LOGGER.error("Ignore response: {} with null sid, session={}", (Object)message, (Object)this.id);
                Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease);
                return;
            }
            long sid = Long.parseLong(message.header(STREAM_ID));
            Processor<ServiceMessage, ServiceMessage> processor = this.inboundProcessors.get(sid);
            if (processor == null) {
                Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease);
                return;
            }
            this.handleResponse(message, arg_0 -> processor.onNext(arg_0), arg_0 -> processor.onError(arg_0), () -> processor.onComplete());
        });
        connection.onDispose(() -> this.inboundProcessors.forEach((k, resp) -> resp.onError((Throwable)new ConnectionClosedException("Connection closed"))));
    }

    public String id() {
        return this.id;
    }

    MonoProcessor<ServiceMessage> newMonoProcessor(long sid) {
        return (MonoProcessor)this.inboundProcessors.computeIfAbsent(sid, key -> {
            LOGGER.debug("Put sid={}, session={}", (Object)sid, (Object)this.id);
            return MonoProcessor.create();
        });
    }

    UnicastProcessor<ServiceMessage> newUnicastProcessor(long sid) {
        return (UnicastProcessor)this.inboundProcessors.computeIfAbsent(sid, key -> {
            LOGGER.debug("Put sid={}, session={}", (Object)sid, (Object)this.id);
            return UnicastProcessor.create();
        });
    }

    void removeProcessor(long sid) {
        if (this.inboundProcessors.remove(sid) != null) {
            LOGGER.debug("Removed sid={}, session={}", (Object)sid, (Object)this.id);
        }
    }

    Mono<Void> send(ByteBuf byteBuf, long sid) {
        return Mono.defer(() -> this.connection.outbound().sendObject((Publisher)Mono.just((Object)byteBuf).map(TextWebSocketFrame::new), f -> true).then().doOnError(th -> {
            Processor<ServiceMessage, ServiceMessage> processor = this.inboundProcessors.remove(sid);
            if (processor != null) {
                processor.onError(th);
            }
        }));
    }

    public Mono<Void> close() {
        return ((WebsocketOutbound)this.connection.outbound()).sendClose().then();
    }

    public Mono<Void> onClose() {
        return this.connection.onDispose();
    }

    private void handleResponse(ServiceMessage response, Consumer<ServiceMessage> onNext, Consumer<Throwable> onError, Runnable onComplete) {
        LOGGER.debug("Handle response: {}, session={}", (Object)response, (Object)this.id);
        try {
            Optional<Signal> signalOptional = Optional.ofNullable(response.header(SIGNAL)).map(Signal::from);
            if (signalOptional.isPresent()) {
                Signal signal = signalOptional.get();
                if (signal == Signal.COMPLETE) {
                    onComplete.run();
                }
                if (signal == Signal.ERROR) {
                    ServiceMessage errorMessage = this.codec.decodeData(response, (Type)((Object)ErrorData.class));
                    Throwable error = DefaultErrorMapper.INSTANCE.toError(errorMessage);
                    String sid = response.header(STREAM_ID);
                    LOGGER.error("Received error response: sid={}, error={}", (Object)sid, (Object)error);
                    onError.accept(error);
                }
            } else {
                onNext.accept(response);
            }
        }
        catch (Exception e) {
            onError.accept(e);
        }
    }

    public String toString() {
        return new StringJoiner(", ", WebsocketSession.class.getSimpleName() + "[", "]").add("id=" + this.id).toString();
    }
}

