package space.chensheng.wsmessenger.server.reliable;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import space.chensheng.wsmessenger.common.component.Messenger;
import space.chensheng.wsmessenger.common.executor.TaskExecutor;
import space.chensheng.wsmessenger.common.reliable.PendingMessageProcessor;
import space.chensheng.wsmessenger.message.component.WsMessage;
import space.chensheng.wsmessenger.server.component.ServerContext;

/* loaded from: input_file:space/chensheng/wsmessenger/server/reliable/ServerPendingMessageProcessor.class */
public class ServerPendingMessageProcessor implements PendingMessageProcessor<WsMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(ServerPendingMessageProcessor.class);
    private ConcurrentHashMap<String, ConcurrentLinkedQueue<WsMessage<?>>> pendingMsgMap = new ConcurrentHashMap<>();
    private DelayQueue<TimeoutInfo> timeoutInfoQueue = new DelayQueue<>();
    private volatile boolean timeoutCheckerStarted = false;
    private Semaphore timeoutCheckerSemaphore = new Semaphore(1);
    private ServerContext serverContext;
    private Messenger<WsMessage<?>> messenger;
    private TaskExecutor taskExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:space/chensheng/wsmessenger/server/reliable/ServerPendingMessageProcessor$TimeoutChecker.class */
    public class TimeoutChecker implements Runnable {
        private static final int BATCH_SIZE = 50;

        private TimeoutChecker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            ServerPendingMessageProcessor.logger.info("Pending client timeout checker begin checking.");
            for (int i2 = 0; i2 < BATCH_SIZE; i2++) {
                TimeoutInfo timeoutInfo = (TimeoutInfo) ServerPendingMessageProcessor.this.timeoutInfoQueue.poll();
                if (timeoutInfo != null) {
                    ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) ServerPendingMessageProcessor.this.pendingMsgMap.remove(timeoutInfo.getPendingId());
                    if (concurrentLinkedQueue != null) {
                        concurrentLinkedQueue.clear();
                    }
                    i++;
                }
            }
            ServerPendingMessageProcessor.logger.info("Pending client timeout checker finish checking, {} pending clients were removed.", Integer.valueOf(i));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:space/chensheng/wsmessenger/server/reliable/ServerPendingMessageProcessor$TimeoutInfo.class */
    public class TimeoutInfo implements Delayed {
        private String pendingId;
        private long trigger;

        public TimeoutInfo(ServerPendingMessageProcessor serverPendingMessageProcessor, String str) {
            this(str, serverPendingMessageProcessor.serverContext.getPendingClientTimeoutMillis());
        }

        public TimeoutInfo(String str, long j) {
            this.pendingId = str;
            this.trigger = System.currentTimeMillis() + j;
        }

        public String getPendingId() {
            return this.pendingId;
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            TimeoutInfo timeoutInfo = (TimeoutInfo) delayed;
            return this.trigger > timeoutInfo.trigger ? 1 : this.trigger < timeoutInfo.trigger ? -1 : 0;
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(this.trigger - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServerPendingMessageProcessor(ServerContext serverContext, Messenger<WsMessage<?>> messenger, TaskExecutor taskExecutor) {
        if (serverContext == null) {
            throw new NullPointerException("serverContext may not be null");
        }
        if (messenger == null) {
            throw new NullPointerException("messenger may not be null");
        }
        if (taskExecutor == null) {
            throw new NullPointerException("taskExecutor may not be null");
        }
        this.serverContext = serverContext;
        this.messenger = messenger;
        this.taskExecutor = taskExecutor;
    }

    public void deliverPendingMessages(String str) {
        String createPendingId;
        ConcurrentLinkedQueue<WsMessage<?>> remove;
        if (str == null || (remove = this.pendingMsgMap.remove((createPendingId = createPendingId(str)))) == null || remove.size() <= 0) {
            return;
        }
        logger.info("start to deliver pending message to client {}", createPendingId);
        int size = remove.size();
        WsMessage<?> poll = remove.poll();
        while (true) {
            WsMessage<?> wsMessage = poll;
            if (wsMessage == null) {
                logger.info("finish to deliver {} pending message to client {}.", Integer.valueOf(size), createPendingId);
                return;
            } else {
                this.messenger.sendMessage(wsMessage, str);
                poll = remove.poll();
            }
        }
    }

    public void addPendingMessage(String str, WsMessage<?> wsMessage) {
        if (str == null || wsMessage == null) {
            return;
        }
        doAddMessage(createPendingId(str), wsMessage);
    }

    public void shutdown() {
        this.timeoutInfoQueue.clear();
        this.pendingMsgMap.clear();
    }

    public boolean isShutdown() {
        return true;
    }

    private String createPendingId(String str) {
        return str;
    }

    private void doAddMessage(final String str, WsMessage<?> wsMessage) {
        startTimeoutCheckerInNeed();
        ConcurrentLinkedQueue<WsMessage<?>> computeIfAbsent = this.pendingMsgMap.computeIfAbsent(str, new Function<String, ConcurrentLinkedQueue<WsMessage<?>>>() { // from class: space.chensheng.wsmessenger.server.reliable.ServerPendingMessageProcessor.1
            @Override // java.util.function.Function
            public ConcurrentLinkedQueue<WsMessage<?>> apply(String str2) {
                if (ServerPendingMessageProcessor.this.pendingMsgMap.size() >= ServerPendingMessageProcessor.this.serverContext.getPendingClientMaxCount()) {
                    ServerPendingMessageProcessor.logger.error("pending clients exceed {}, new pending client will not be accepted.", Integer.valueOf(ServerPendingMessageProcessor.this.serverContext.getPendingClientMaxCount()));
                    return null;
                }
                ServerPendingMessageProcessor.this.timeoutInfoQueue.put((DelayQueue) new TimeoutInfo(ServerPendingMessageProcessor.this, str));
                return new ConcurrentLinkedQueue<>();
            }
        });
        if (computeIfAbsent == null) {
            return;
        }
        if (computeIfAbsent.size() >= this.serverContext.getPendingClientMaxCount()) {
            logger.error("pending clients exceed {}, new pending client will not be accepted.", Integer.valueOf(this.serverContext.getPendingClientMaxCount()));
        } else {
            computeIfAbsent.offer(wsMessage);
            logger.info("add pending message for client {}, message:{}.", str, wsMessage);
        }
    }

    private void startTimeoutCheckerInNeed() {
        if (this.timeoutCheckerStarted || !this.timeoutCheckerSemaphore.tryAcquire()) {
            return;
        }
        try {
            if (!this.timeoutCheckerStarted) {
                this.timeoutCheckerStarted = true;
                this.taskExecutor.scheduleTaskAtFixedRate(new TimeoutChecker(), 1L, this.serverContext.getPendingClientTimeoutCheckerIntervalMinutes(), TimeUnit.MINUTES);
            }
        } finally {
            this.timeoutCheckerSemaphore.release();
        }
    }
}
