/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.gateway.transport.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.reactor.RetryNonSerializedEmitFailureHandler;
import io.scalecube.services.api.ErrorData;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.websocket.Signal;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import java.lang.reflect.Type;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.StringJoiner;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

public final class WebsocketGatewayClientSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewayClientSession.class);
    private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private static final String STREAM_ID = "sid";
    private static final String SIGNAL = "sig";
    private final String id;
    private final GatewayClientCodec<ByteBuf> codec;
    private final Connection connection;
    private final Map<Long, Object> inboundProcessors = new NonBlockingHashMapLong(1024);

    WebsocketGatewayClientSession(GatewayClientCodec<ByteBuf> codec, Connection connection) {
        this.id = Integer.toHexString(System.identityHashCode(this));
        this.codec = codec;
        this.connection = connection;
        WebsocketInbound inbound = (WebsocketInbound)connection.inbound();
        inbound.receive().retain().subscribe(byteBuf -> {
            ServiceMessage message;
            if (!byteBuf.isReadable()) {
                ReferenceCountUtil.safestRelease((Object)byteBuf);
                return;
            }
            try {
                message = codec.decode((ByteBuf)byteBuf);
            }
            catch (Exception ex) {
                LOGGER.error("Response decoder failed:", (Throwable)ex);
                return;
            }
            if (!message.headers().containsKey(STREAM_ID)) {
                LOGGER.error("Ignore response: {} with null sid, session={}", (Object)message, (Object)this.id);
                if (message.data() != null) {
                    ReferenceCountUtil.safestRelease((Object)message.data());
                }
                return;
            }
            long sid = Long.parseLong(message.header(STREAM_ID));
            Object processor = this.inboundProcessors.get(sid);
            if (processor == null) {
                if (message.data() != null) {
                    ReferenceCountUtil.safestRelease((Object)message.data());
                }
                return;
            }
            this.handleResponse(message, processor);
        });
        connection.onDispose(() -> this.inboundProcessors.forEach((k, o) -> WebsocketGatewayClientSession.emitError(o, CLOSED_CHANNEL_EXCEPTION)));
    }

    <T> Sinks.One<T> newMonoProcessor(long sid) {
        return (Sinks.One)this.inboundProcessors.computeIfAbsent(sid, this::newMonoProcessor0);
    }

    <T> Sinks.Many<T> newUnicastProcessor(long sid) {
        return (Sinks.Many)this.inboundProcessors.computeIfAbsent(sid, this::newUnicastProcessor0);
    }

    private Sinks.One<Object> newMonoProcessor0(long sid) {
        LOGGER.debug("Put sid={}, session={}", (Object)sid, (Object)this.id);
        return Sinks.one();
    }

    private Sinks.Many<Object> newUnicastProcessor0(long sid) {
        LOGGER.debug("Put sid={}, session={}", (Object)sid, (Object)this.id);
        return Sinks.many().unicast().onBackpressureBuffer();
    }

    void removeProcessor(long sid) {
        if (this.inboundProcessors.remove(sid) != null) {
            LOGGER.debug("Removed sid={}, session={}", (Object)sid, (Object)this.id);
        }
    }

    Mono<Void> send(ByteBuf byteBuf) {
        return this.connection.outbound().sendObject((Object)new TextWebSocketFrame(byteBuf)).then();
    }

    void cancel(long sid, String qualifier) {
        ByteBuf byteBuf = this.codec.encode(ServiceMessage.builder().qualifier(qualifier).header(STREAM_ID, (Object)sid).header(SIGNAL, Signal.CANCEL.codeAsString()).build());
        this.send(byteBuf).subscribe(null, th -> LOGGER.error("Exception occurred on sending CANCEL signal for session={}", (Object)this.id, th));
    }

    public Mono<Void> close() {
        return ((WebsocketOutbound)this.connection.outbound()).sendClose().then();
    }

    public Mono<Void> onClose() {
        return this.connection.onDispose();
    }

    private void handleResponse(ServiceMessage response, Object processor) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Handle response: {}, session={}", (Object)response, (Object)this.id);
        }
        try {
            Signal signal = null;
            String header = response.header(SIGNAL);
            if (header != null) {
                signal = Signal.from(header);
            }
            if (signal == null) {
                WebsocketGatewayClientSession.emitNext(processor, response);
            } else {
                if (signal == Signal.COMPLETE) {
                    WebsocketGatewayClientSession.emitComplete(processor);
                }
                if (signal == Signal.ERROR) {
                    WebsocketGatewayClientSession.emitNext(processor, this.codec.decodeData(response, (Type)((Object)ErrorData.class)));
                }
            }
        }
        catch (Exception e) {
            WebsocketGatewayClientSession.emitError(processor, e);
        }
    }

    private static void emitNext(Object processor, ServiceMessage message) {
        if (processor instanceof Sinks.One) {
            ((Sinks.One)processor).emitValue((Object)message, (Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
        }
        if (processor instanceof Sinks.Many) {
            ((Sinks.Many)processor).emitNext((Object)message, (Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
        }
    }

    private static void emitComplete(Object processor) {
        if (processor instanceof Sinks.One) {
            ((Sinks.One)processor).emitEmpty((Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
        }
        if (processor instanceof Sinks.Many) {
            ((Sinks.Many)processor).emitComplete((Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
        }
    }

    private static void emitError(Object processor, Exception e) {
        if (processor instanceof Sinks.One) {
            ((Sinks.One)processor).emitError((Throwable)e, (Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
        }
        if (processor instanceof Sinks.Many) {
            ((Sinks.Many)processor).emitError((Throwable)e, (Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
        }
    }

    public String toString() {
        return new StringJoiner(", ", WebsocketGatewayClientSession.class.getSimpleName() + "[", "]").add("id=" + this.id).toString();
    }
}

