package io.kroxylicious.test.client;

import io.kroxylicious.test.Request;
import io.kroxylicious.test.Response;
import io.kroxylicious.test.ResponsePayload;
import io.kroxylicious.test.codec.DecodedRequestFrame;
import io.kroxylicious.test.codec.DecodedResponseFrame;
import io.kroxylicious.test.codec.KafkaRequestEncoder;
import io.kroxylicious.test.codec.KafkaResponseDecoder;
import io.kroxylicious.test.codec.RequestFrame;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.common.message.RequestHeaderData;

/* loaded from: input_file:io/kroxylicious/test/client/KafkaClient.class */
public final class KafkaClient implements AutoCloseable {
    private final String host;
    private final int port;
    private final AtomicReference<CompletableFuture<Channel>> connected = new AtomicReference<>();
    private final EventGroupConfig eventGroupConfig = EventGroupConfig.create();
    private final EventLoopGroup bossGroup = this.eventGroupConfig.newBossGroup();
    private final CorrelationManager correlationManager = new CorrelationManager();
    private final KafkaClientHandler kafkaClientHandler = new KafkaClientHandler();
    private static final AtomicInteger correlationId = new AtomicInteger(1);

    public KafkaClient(String str, int i) {
        this.host = str;
        this.port = i;
    }

    private static DecodedRequestFrame<?> toApiRequest(Request request) {
        RequestHeaderData requestApiVersion = new RequestHeaderData().setRequestApiKey(request.apiKeys().messageType.apiKey()).setRequestApiVersion(request.apiVersion());
        requestApiVersion.setClientId(request.clientIdHeader());
        requestApiVersion.setCorrelationId(correlationId.incrementAndGet());
        return new DecodedRequestFrame<>(requestApiVersion.requestApiVersion(), requestApiVersion.correlationId(), requestApiVersion, request.message());
    }

    public CompletableFuture<Response> get(Request request) {
        DecodedRequestFrame<?> apiRequest = toApiRequest(request);
        return ensureChannel(this.correlationManager, this.kafkaClientHandler).thenApply(KafkaClient::checkChannelOpen).thenCompose((Function<? super U, ? extends CompletionStage<U>>) channel -> {
            return this.kafkaClientHandler.sendRequest(apiRequest);
        }).thenApply(KafkaClient::toResponse);
    }

    public CompletableFuture<Response> get(RequestFrame requestFrame) {
        return ensureChannel(this.correlationManager, this.kafkaClientHandler).thenApply(KafkaClient::checkChannelOpen).thenCompose((Function<? super U, ? extends CompletionStage<U>>) channel -> {
            return this.kafkaClientHandler.sendRequest(requestFrame);
        }).thenApply(KafkaClient::toResponse);
    }

    public Response getSync(Request request) {
        try {
            return get(request).get(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    private CompletableFuture<Channel> ensureChannel(final CorrelationManager correlationManager, final KafkaClientHandler kafkaClientHandler) {
        CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
        if (!this.connected.compareAndSet(null, completableFuture)) {
            return this.connected.get();
        }
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.bossGroup).channel(this.eventGroupConfig.clientChannelClass()).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { // from class: io.kroxylicious.test.client.KafkaClient.1
            public void initChannel(SocketChannel socketChannel) {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast(new ChannelHandler[]{new KafkaRequestEncoder(correlationManager)});
                pipeline.addLast(new ChannelHandler[]{new KafkaResponseDecoder(correlationManager)});
                pipeline.addLast(new ChannelHandler[]{kafkaClientHandler});
            }
        });
        ChannelFuture connect = bootstrap.connect(this.host, this.port);
        connect.addListeners(new GenericFutureListener[]{channelFuture -> {
            completableFuture.complete(channelFuture.channel());
        }});
        connect.channel().closeFuture().addListener(future -> {
            correlationManager.onChannelClose();
        });
        return completableFuture;
    }

    public boolean isOpen() {
        Channel now;
        CompletableFuture<Channel> completableFuture = this.connected.get();
        return (completableFuture == null || (now = completableFuture.getNow(null)) == null || !now.isOpen()) ? false : true;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        CompletableFuture<Channel> completableFuture = this.connected.get();
        if (completableFuture != null) {
            completableFuture.thenApply((v0) -> {
                return v0.close();
            });
        }
        this.bossGroup.shutdownGracefully();
    }

    private static Channel checkChannelOpen(Channel channel) {
        if (channel.isOpen()) {
            return channel;
        }
        throw new RuntimeException("Channel is already closed");
    }

    /* JADX WARN: Type inference failed for: r6v1, types: [org.apache.kafka.common.protocol.ApiMessage] */
    private static Response toResponse(SequencedResponse sequencedResponse) {
        DecodedResponseFrame<?> frame = sequencedResponse.frame();
        return new Response(new ResponsePayload(frame.apiKey(), frame.apiVersion(), frame.body()), sequencedResponse.sequenceNumber());
    }
}
