/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.netty.websocket;

import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.MediaType;
import io.micronaut.http.netty.websocket.NettyWebSocketSession;
import io.micronaut.http.netty.websocket.WebSocketMessageEncoder;
import io.micronaut.http.netty.websocket.WebSocketSessionRepository;
import io.micronaut.websocket.WebSocketBroadcaster;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.exceptions.WebSocketSessionException;
import io.netty.channel.group.ChannelGroupException;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.Attribute;
import jakarta.inject.Singleton;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

@Singleton
@Requires(beans={WebSocketSessionRepository.class})
public class NettyServerWebSocketBroadcaster
implements WebSocketBroadcaster {
    private final WebSocketMessageEncoder webSocketMessageEncoder;
    private final WebSocketSessionRepository webSocketSessionRepository;

    public NettyServerWebSocketBroadcaster(WebSocketMessageEncoder webSocketMessageEncoder, WebSocketSessionRepository webSocketSessionRepository) {
        this.webSocketMessageEncoder = webSocketMessageEncoder;
        this.webSocketSessionRepository = webSocketSessionRepository;
    }

    public <T> void broadcastSync(T message, MediaType mediaType, Predicate<WebSocketSession> filter) {
        WebSocketFrame frame = this.webSocketMessageEncoder.encodeMessage(message, mediaType);
        try {
            this.webSocketSessionRepository.getChannelGroup().writeAndFlush((Object)frame, ch -> {
                Attribute attr = ch.attr(NettyWebSocketSession.WEB_SOCKET_SESSION_KEY);
                NettyWebSocketSession s = (NettyWebSocketSession)attr.get();
                return s != null && s.isOpen() && filter.test(s);
            }).sync();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new WebSocketSessionException("Broadcast Interrupted");
        }
    }

    public <T> Publisher<T> broadcast(T message, MediaType mediaType, Predicate<WebSocketSession> filter) {
        return Flux.create(emitter -> {
            try {
                WebSocketFrame frame = this.webSocketMessageEncoder.encodeMessage(message, mediaType);
                this.webSocketSessionRepository.getChannelGroup().writeAndFlush((Object)frame, ch -> {
                    Attribute attr = ch.attr(NettyWebSocketSession.WEB_SOCKET_SESSION_KEY);
                    NettyWebSocketSession s = (NettyWebSocketSession)attr.get();
                    return s != null && s.isOpen() && filter.test(s);
                }).addListener(future -> {
                    Throwable cause;
                    if (!future.isSuccess() && (cause = this.extractBroadcastFailure(future.cause())) != null) {
                        emitter.error((Throwable)new WebSocketSessionException("Broadcast Failure: " + cause.getMessage(), cause));
                        return;
                    }
                    emitter.next(message);
                    emitter.complete();
                });
            }
            catch (Throwable e) {
                emitter.error((Throwable)new WebSocketSessionException("Broadcast Failure: " + e.getMessage(), e));
            }
        }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.BUFFER);
    }

    @Nullable
    private Throwable extractBroadcastFailure(Throwable failure) {
        if (failure instanceof ChannelGroupException) {
            Throwable singleCause = null;
            for (Map.Entry entry : (ChannelGroupException)failure) {
                Throwable entryCause = this.extractBroadcastFailure((Throwable)entry.getValue());
                if (entryCause == null) continue;
                if (singleCause == null) {
                    singleCause = entryCause;
                    continue;
                }
                return failure;
            }
            return singleCause;
        }
        if (failure instanceof ClosedChannelException) {
            return null;
        }
        return failure;
    }
}

