/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.gateway.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.gateway.websocket.message.GatewayMessage;
import io.scalecube.gateway.websocket.message.GatewayMessageCodec;
import io.scalecube.gateway.websocket.message.Signal;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ExceptionProcessor;
import java.util.Map;
import java.util.Optional;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.netty.NettyPipeline;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

public final class WebsocketSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSession.class);
    private static final String DEFAULT_CONTENT_TYPE = "application/json";
    private final Map<Long, Disposable> subscriptions = new NonBlockingHashMapLong(1024);
    private final WebsocketInbound inbound;
    private final WebsocketOutbound outbound;
    private final GatewayMessageCodec messageCodec;
    private final String id;
    private final String contentType;

    public WebsocketSession(GatewayMessageCodec messageCodec, HttpServerRequest httpRequest, WebsocketInbound inbound, WebsocketOutbound outbound) {
        this.messageCodec = messageCodec;
        this.id = Integer.toHexString(System.identityHashCode(this));
        HttpHeaders httpHeaders = httpRequest.requestHeaders();
        this.contentType = Optional.ofNullable(httpHeaders.get((CharSequence)HttpHeaderNames.CONTENT_TYPE)).orElse(DEFAULT_CONTENT_TYPE);
        this.inbound = (WebsocketInbound)inbound.withConnection(connection -> connection.onDispose(this::clearSubscriptions));
        this.outbound = (WebsocketOutbound)outbound.options(NettyPipeline.SendOptions::flushOnEach);
    }

    public String id() {
        return this.id;
    }

    public String contentType() {
        return this.contentType;
    }

    public Flux<ByteBuf> receive() {
        return this.inbound.aggregateFrames().receive().retain();
    }

    public Mono<Void> send(GatewayMessage response) {
        return this.send((Mono<ByteBuf>)Mono.just((Object)response).map(this.messageCodec::encode)).doOnSuccessOrError((avoid, th) -> this.logSend(response, (Throwable)th));
    }

    public Mono<Void> send(Throwable err, Long sid) {
        return this.send((Mono<ByteBuf>)Mono.just((Object)err).map(ExceptionProcessor::toMessage).map(msg -> this.toErrorMessage(sid, (ServiceMessage)msg)).map(this.messageCodec::encode)).doOnSuccessOrError((avoid, th) -> this.logSend(err, sid, (Throwable)th));
    }

    private Mono<Void> send(Mono<ByteBuf> publisher) {
        return publisher.map(TextWebSocketFrame::new).flatMap(frame -> this.outbound.sendObject(frame).then());
    }

    private void logSend(GatewayMessage response, Throwable th) {
        if (th == null) {
            LOGGER.debug("<< SEND success: {}, session={}", (Object)response, (Object)this.id);
        } else {
            LOGGER.warn("<< SEND failed: {}, session={}, cause: {}", new Object[]{response, this.id, th});
        }
    }

    private void logSend(Throwable err, Long sid, Throwable th) {
        if (th == null) {
            LOGGER.debug("<< SEND success: {}, sid={}, session={}", new Object[]{err, sid, this.id});
        } else {
            LOGGER.warn("<< SEND failed: {}, sid={}, session={}, cause: {}", new Object[]{err, sid, this.id, th});
        }
    }

    private GatewayMessage toErrorMessage(Long sid, ServiceMessage msg) {
        GatewayMessage.Builder builder = GatewayMessage.from(msg);
        Optional.ofNullable(sid).ifPresent(builder::streamId);
        return builder.signal(Signal.ERROR).build();
    }

    public Mono<Void> close() {
        return this.outbound.sendClose().then();
    }

    public Mono<Void> onClose(Disposable disposable) {
        return Mono.create(sink -> this.inbound.withConnection(connection -> connection.onDispose(disposable).onTerminate().subscribe(arg_0 -> ((MonoSink)sink).success(arg_0), arg_0 -> ((MonoSink)sink).error(arg_0), () -> ((MonoSink)sink).success())));
    }

    public boolean dispose(Long streamId) {
        boolean result = false;
        if (streamId != null) {
            Disposable disposable = this.subscriptions.remove(streamId);
            boolean bl = result = disposable != null;
            if (result) {
                LOGGER.debug("Dispose subscription by sid={}, session={}", (Object)streamId, (Object)this.id);
                disposable.dispose();
            }
        }
        return result;
    }

    public boolean containsSid(Long streamId) {
        return streamId != null && this.subscriptions.containsKey(streamId);
    }

    public boolean register(Long streamId, Disposable disposable) {
        boolean result = false;
        if (!disposable.isDisposed()) {
            boolean bl = result = this.subscriptions.putIfAbsent(streamId, disposable) == null;
        }
        if (result) {
            LOGGER.debug("Registered subscription with sid={}, session={}", (Object)streamId, (Object)this.id);
        }
        return result;
    }

    private void clearSubscriptions() {
        if (this.subscriptions.size() > 1) {
            LOGGER.info("Clear all {} subscriptions on session={}", (Object)this.subscriptions.size(), (Object)this.id);
        } else if (this.subscriptions.size() == 1) {
            LOGGER.info("Clear 1 subscription on session={}", (Object)this.id);
        }
        this.subscriptions.forEach((sid, disposable) -> disposable.dispose());
        this.subscriptions.clear();
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("WebsocketSession{");
        sb.append("id='").append(this.id).append('\'');
        sb.append(", contentType='").append(this.contentType).append('\'');
        sb.append('}');
        return sb.toString();
    }
}

