package io.kroxylicious.test.client;

import io.kroxylicious.test.codec.RequestFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/test/client/KafkaClientHandler.class */
public class KafkaClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientHandler.class);
    private final Deque<RequestFrame> queue = new ConcurrentLinkedDeque();
    private ChannelHandlerContext ctx;
    private boolean channelActivationSeen;

    public void channelRegistered(ChannelHandlerContext channelHandlerContext) {
        this.ctx = channelHandlerContext;
        channelHandlerContext.fireChannelRegistered();
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) {
        this.channelActivationSeen = true;
        processPendingWrites();
        channelHandlerContext.fireChannelActive();
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.flush();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        LOGGER.warn("Kafka test client received unexpected exception, closing connection.", th);
        channelHandlerContext.close();
    }

    public CompletableFuture<SequencedResponse> sendRequest(RequestFrame requestFrame) {
        this.queue.addLast(requestFrame);
        processPendingWrites();
        return requestFrame.getResponseFuture();
    }

    private void processPendingWrites() {
        this.ctx.executor().execute(() -> {
            if (this.channelActivationSeen) {
                while (this.queue.peek() != null) {
                    RequestFrame removeFirst = this.queue.removeFirst();
                    this.ctx.writeAndFlush(removeFirst).addListener(future -> {
                        CompletableFuture<SequencedResponse> responseFuture = removeFirst.getResponseFuture();
                        if (future.cause() != null) {
                            responseFuture.completeExceptionally(future.cause());
                        } else {
                            if (removeFirst.hasResponse()) {
                                return;
                            }
                            responseFuture.complete(null);
                        }
                    });
                }
            }
        });
    }
}
