package kafka.network;

import java.io.EOFException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import kafka.api.RequestKeys$;
import kafka.utils.Time;
import kafka.utils.Utils$;
import org.apache.log4j.Logger;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Predef$;
import scala.ScalaObject;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SocketServer.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=b!C\u0001\u0003\t\u0003\u0005\t\u0011\u0001\u0003\u0007\u0005%\u0001&o\\2fgN|'O\u0003\u0002\u0004\t\u00059a.\u001a;x_J\\'\"A\u0003\u0002\u000b-\fgm[1\u0014\u0007\u000191\u0002\u0005\u0002\t\u00135\t!!\u0003\u0002\u000b\u0005\t!\u0012IY:ue\u0006\u001cGoU3sm\u0016\u0014H\u000b\u001b:fC\u0012\u0004\"\u0001D\b\u000e\u00035Q\u0011AD\u0001\u0006g\u000e\fG.Y\u0005\u0003!5\u00111bU2bY\u0006|%M[3di\"A!\u0003\u0001BC\u0002\u0013\u0005A#\u0001\biC:$G.\u001a:NCB\u0004\u0018N\\4\u0004\u0001U\tQ\u0003\u0005\u0002\u001739\u0011\u0001bF\u0005\u00031\t\tq\u0001S1oI2,'/\u0003\u0002\u001b7\tq\u0001*\u00198eY\u0016\u0014X*\u00199qS:<'B\u0001\r\u0003\u0011!i\u0002A!A!\u0002\u0013)\u0012a\u00045b]\u0012dWM]'baBLgn\u001a\u0011\t\u0011}\u0001!Q1A\u0005\u0002\u0001\nA\u0001^5nKV\t\u0011\u0005\u0005\u0002#K5\t1E\u0003\u0002%\t\u0005)Q\u000f^5mg&\u0011ae\t\u0002\u0005)&lW\r\u0003\u0005)\u0001\t\u0005\t\u0015!\u0003\"\u0003\u0015!\u0018.\\3!\u0011!Q\u0003A!b\u0001\n\u0003Y\u0013!B:uCR\u001cX#\u0001\u0017\u0011\u0005!i\u0013B\u0001\u0018\u0003\u0005E\u0019vnY6fiN+'O^3s'R\fGo\u001d\u0005\ta\u0001\u0011\t\u0011)A\u0005Y\u000511\u000f^1ug\u0002B\u0001B\r\u0001\u0003\u0006\u0004%\taM\u0001\u000f[\u0006D(+Z9vKN$8+\u001b>f+\u0005!\u0004C\u0001\u00076\u0013\t1TBA\u0002J]RD\u0001\u0002\u000f\u0001\u0003\u0002\u0003\u0006I\u0001N\u0001\u0010[\u0006D(+Z9vKN$8+\u001b>fA!)!\b\u0001C\u0001w\u00051A(\u001b8jiz\"R\u0001P\u001f?\u007f\u0001\u0003\"\u0001\u0003\u0001\t\u000bII\u0004\u0019A\u000b\t\u000b}I\u0004\u0019A\u0011\t\u000b)J\u0004\u0019\u0001\u0017\t\u000bIJ\u0004\u0019\u0001\u001b\t\u000f\t\u0003!\u0019!C\u0005\u0007\u0006qa.Z<D_:tWm\u0019;j_:\u001cX#\u0001#\u0011\u0007\u0015ce*D\u0001G\u0015\t9\u0005*\u0001\u0006d_:\u001cWO\u001d:f]RT!!\u0013&\u0002\tU$\u0018\u000e\u001c\u0006\u0002\u0017\u0006!!.\u0019<b\u0013\tieIA\u000bD_:\u001cWO\u001d:f]Rd\u0015N\\6fIF+X-^3\u0011\u0005=#V\"\u0001)\u000b\u0005E\u0013\u0016\u0001C2iC:tW\r\\:\u000b\u0005MS\u0015a\u00018j_&\u0011Q\u000b\u0015\u0002\u000e'>\u001c7.\u001a;DQ\u0006tg.\u001a7\t\r]\u0003\u0001\u0015!\u0003E\u0003=qWm^\"p]:,7\r^5p]N\u0004\u0003bB-\u0001\u0005\u0004%IAW\u0001\u000ee\u0016\fX/Z:u\u0019><w-\u001a:\u0016\u0003m\u0003\"\u0001X2\u000e\u0003uS!AX0\u0002\u000b1|w\r\u000e6\u000b\u0005\u0001\f\u0017AB1qC\u000eDWMC\u0001c\u0003\ry'oZ\u0005\u0003Iv\u0013a\u0001T8hO\u0016\u0014\bB\u00024\u0001A\u0003%1,\u0001\bsKF,Xm\u001d;M_\u001e<WM\u001d\u0011\t\u000b!\u0004A\u0011I5\u0002\u0007I,h\u000eF\u0001k!\ta1.\u0003\u0002m\u001b\t!QK\\5u\u0011\u0015q\u0007\u0001\"\u0003p\u0003\u0015\u0019Gn\\:f)\tQ\u0007\u000fC\u0003r[\u0002\u0007!/A\u0002lKf\u0004\"aT:\n\u0005Q\u0004&\u0001D*fY\u0016\u001cG/[8o\u0017\u0016L\b\"\u0002<\u0001\t\u00039\u0018AB1dG\u0016\u0004H\u000f\u0006\u0002kq\")\u00110\u001ea\u0001\u001d\u0006i1o\\2lKR\u001c\u0005.\u00198oK2DQa\u001f\u0001\u0005\n%\fqcY8oM&<WO]3OK^\u001cuN\u001c8fGRLwN\\:\t\u000bu\u0004A\u0011\u0002@\u0002\r!\fg\u000e\u001a7f)\u0015y\u00181BA\u0007!\u0015a\u0011\u0011AA\u0003\u0013\r\t\u0019!\u0004\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0007!\t9!C\u0002\u0002\n\t\u0011AaU3oI\")\u0011\u000f a\u0001e\"9\u0011q\u0002?A\u0002\u0005E\u0011a\u0002:fcV,7\u000f\u001e\t\u0004\u0011\u0005M\u0011bAA\u000b\u0005\t9!+Z2fSZ,\u0007bBA\r\u0001\u0011\u0005\u00111D\u0001\u0005e\u0016\fG\rF\u0002k\u0003;Aa!]A\f\u0001\u0004\u0011\bbBA\u0011\u0001\u0011\u0005\u00111E\u0001\u0006oJLG/\u001a\u000b\u0004U\u0006\u0015\u0002BB9\u0002 \u0001\u0007!\u000fC\u0004\u0002*\u0001!I!a\u000b\u0002\u0015\rD\u0017M\u001c8fY\u001a{'\u000fF\u0002O\u0003[Aa!]A\u0014\u0001\u0004\u0011\b")
/* loaded from: input_file:kafka/network/Processor.class */
public class Processor extends AbstractServerThread implements ScalaObject {
    private final Function2<Short, Receive, Function1<Receive, Option<Send>>> handlerMapping;
    private final Time time;
    private final SocketServerStats stats;
    private final int maxRequestSize;
    private final ConcurrentLinkedQueue<SocketChannel> newConnections = new ConcurrentLinkedQueue<>();
    private final Logger requestLogger = Logger.getLogger("kafka.request.logger");

    public Function2<Short, Receive, Function1<Receive, Option<Send>>> handlerMapping() {
        return this.handlerMapping;
    }

    public Time time() {
        return this.time;
    }

    public SocketServerStats stats() {
        return this.stats;
    }

    public int maxRequestSize() {
        return this.maxRequestSize;
    }

    private ConcurrentLinkedQueue<SocketChannel> newConnections() {
        return this.newConnections;
    }

    private Logger requestLogger() {
        return this.requestLogger;
    }

    @Override // java.lang.Runnable
    public void run() {
        startupComplete();
        loop0: while (isRunning()) {
            configureNewConnections();
            if (selector().select(500L) > 0) {
                Iterator<SelectionKey> it = selector().selectedKeys().iterator();
                while (it.hasNext() && isRunning()) {
                    SelectionKey selectionKey = null;
                    try {
                        selectionKey = it.next();
                        it.remove();
                        if (selectionKey.isReadable()) {
                            read(selectionKey);
                        } else if (selectionKey.isWritable()) {
                            write(selectionKey);
                        } else {
                            if (selectionKey.isValid()) {
                                throw new IllegalStateException("Unrecognized key state for processor thread.");
                                break loop0;
                            }
                            close(selectionKey);
                        }
                    } catch (EOFException e) {
                        logger().info(Predef$.MODULE$.augmentString("Closing socket connection to %s.").format(Predef$.MODULE$.genericWrapArray(new Object[]{channelFor(selectionKey).socket().getInetAddress()})));
                        close(selectionKey);
                    } catch (InvalidRequestException e2) {
                        logger().info(Predef$.MODULE$.augmentString("Closing socket connection to %s due to invalid request: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{channelFor(selectionKey).socket().getInetAddress(), e2.getMessage()})));
                        close(selectionKey);
                    } catch (Throwable th) {
                        logger().error(new StringBuilder().append((Object) "Closing socket for ").append(channelFor(selectionKey).socket().getInetAddress()).append((Object) " because of error").toString(), th);
                        close(selectionKey);
                    }
                }
            }
        }
        logger().debug("Closing selector.");
        Utils$.MODULE$.swallow(new Processor$$anonfun$run$6(this), new Processor$$anonfun$run$3(this));
        shutdownComplete();
    }

    private void close(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (logger().isDebugEnabled()) {
            logger().debug(new StringBuilder().append((Object) "Closing connection from ").append(socketChannel.socket().getRemoteSocketAddress()).toString());
        }
        Utils$.MODULE$.swallow(new Processor$$anonfun$close$4(this), new Processor$$anonfun$close$1(this, socketChannel));
        Utils$.MODULE$.swallow(new Processor$$anonfun$close$5(this), new Processor$$anonfun$close$2(this, socketChannel));
        selectionKey.attach(null);
        Utils$.MODULE$.swallow(new Processor$$anonfun$close$6(this), new Processor$$anonfun$close$3(this, selectionKey));
    }

    public void accept(SocketChannel socketChannel) {
        newConnections().add(socketChannel);
        selector().wakeup();
    }

    private void configureNewConnections() {
        while (newConnections().size() > 0) {
            SocketChannel poll = newConnections().poll();
            if (logger().isDebugEnabled()) {
                logger().debug(new StringBuilder().append((Object) "Listening to new connection from ").append(poll.socket().getRemoteSocketAddress()).toString());
            }
            poll.register(selector(), 1);
        }
    }

    private Option<Send> handle(SelectionKey selectionKey, Receive receive) {
        short s = receive.buffer().getShort();
        if (requestLogger().isTraceEnabled()) {
            if (s == RequestKeys$.MODULE$.Produce()) {
                requestLogger().trace(new StringBuilder().append((Object) "Handling produce request from ").append(channelFor(selectionKey).socket().getRemoteSocketAddress()).toString());
            } else if (s == RequestKeys$.MODULE$.Fetch()) {
                requestLogger().trace(new StringBuilder().append((Object) "Handling fetch request from ").append(channelFor(selectionKey).socket().getRemoteSocketAddress()).toString());
            } else if (s == RequestKeys$.MODULE$.MultiFetch()) {
                requestLogger().trace(new StringBuilder().append((Object) "Handling multi-fetch request from ").append(channelFor(selectionKey).socket().getRemoteSocketAddress()).toString());
            } else if (s == RequestKeys$.MODULE$.MultiProduce()) {
                requestLogger().trace(new StringBuilder().append((Object) "Handling multi-produce request from ").append(channelFor(selectionKey).socket().getRemoteSocketAddress()).toString());
            } else {
                if (s != RequestKeys$.MODULE$.Offsets()) {
                    throw new InvalidRequestException(new StringBuilder().append((Object) "No mapping found for handler id ").append(BoxesRunTime.boxToShort(s)).toString());
                }
                requestLogger().trace(new StringBuilder().append((Object) "Handling offset request from ").append(channelFor(selectionKey).socket().getRemoteSocketAddress()).toString());
            }
        }
        Function1<Receive, Option<Send>> mo3468apply = handlerMapping().mo3468apply(BoxesRunTime.boxToShort(s), receive);
        if (mo3468apply == null) {
            throw new InvalidRequestException("No handler found for request");
        }
        long nanoseconds = time().nanoseconds();
        Option<Send> mo913apply = mo3468apply.mo913apply(receive);
        stats().recordRequest(s, time().nanoseconds() - nanoseconds);
        return mo913apply;
    }

    public void read(SelectionKey selectionKey) {
        SocketChannel channelFor = channelFor(selectionKey);
        Receive receive = (Receive) selectionKey.attachment();
        if (selectionKey.attachment() == null) {
            receive = new BoundedByteBufferReceive(maxRequestSize());
            selectionKey.attach(receive);
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        int readFrom = receive.readFrom(channelFor);
        stats().recordBytesRead(readFrom);
        if (logger().isTraceEnabled()) {
            logger().trace(new StringBuilder().append(readFrom).append((Object) " bytes read from ").append(channelFor.socket().getRemoteSocketAddress()).toString());
        }
        if (readFrom < 0) {
            close(selectionKey);
            return;
        }
        if (!receive.complete()) {
            selectionKey.interestOps(1);
            selector().wakeup();
            return;
        }
        Option<Send> handle = handle(selectionKey, receive);
        selectionKey.attach(null);
        if (handle.isDefined()) {
            selectionKey.attach(handle.getOrElse(new Processor$$anonfun$read$1(this)));
            selectionKey.interestOps(4);
        }
    }

    public void write(SelectionKey selectionKey) {
        Send send = (Send) selectionKey.attachment();
        SocketChannel channelFor = channelFor(selectionKey);
        int writeTo = send.writeTo(channelFor);
        stats().recordBytesWritten(writeTo);
        if (logger().isTraceEnabled()) {
            logger().trace(new StringBuilder().append(writeTo).append((Object) " bytes written to ").append(channelFor.socket().getRemoteSocketAddress()).toString());
        }
        if (send.complete()) {
            selectionKey.attach(null);
            selectionKey.interestOps(1);
        } else {
            selectionKey.interestOps(4);
            selector().wakeup();
        }
    }

    private SocketChannel channelFor(SelectionKey selectionKey) {
        return (SocketChannel) selectionKey.channel();
    }

    public Processor(Function2<Short, Receive, Function1<Receive, Option<Send>>> function2, Time time, SocketServerStats socketServerStats, int i) {
        this.handlerMapping = function2;
        this.time = time;
        this.stats = socketServerStats;
        this.maxRequestSize = i;
    }
}
