package net.kuujo.catalyst.transport;

import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import net.kuujo.catalyst.buffer.Buffer;
import net.kuujo.catalyst.util.Assert;
import net.kuujo.catalyst.util.Listener;
import net.kuujo.catalyst.util.Listeners;
import net.kuujo.catalyst.util.ReferenceCounted;
import net.kuujo.catalyst.util.concurrent.Context;
import net.kuujo.catalyst.util.concurrent.Futures;

/* loaded from: input_file:net/kuujo/catalyst/transport/LocalConnection.class */
public class LocalConnection implements Connection {
    private final UUID id;
    private final Context context;
    private final Set<LocalConnection> connections;
    private LocalConnection connection;
    private final Map<Class, HandlerHolder> handlers;
    private final Listeners<Throwable> exceptionListeners;
    private final Listeners<Connection> closeListeners;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:net/kuujo/catalyst/transport/LocalConnection$HandlerHolder.class */
    public static class HandlerHolder {
        private final MessageHandler handler;
        private final Context context;

        private HandlerHolder(MessageHandler messageHandler, Context context) {
            this.handler = messageHandler;
            this.context = context;
        }
    }

    public LocalConnection(UUID uuid, Context context) {
        this(uuid, context, null);
    }

    public LocalConnection(UUID uuid, Context context, Set<LocalConnection> set) {
        this.handlers = new ConcurrentHashMap();
        this.exceptionListeners = new Listeners<>();
        this.closeListeners = new Listeners<>();
        this.id = uuid;
        this.context = context;
        this.connections = set;
    }

    public LocalConnection connect(LocalConnection localConnection) {
        this.connection = localConnection;
        return this;
    }

    public UUID id() {
        return this.id;
    }

    private Context getContext() {
        Context currentContext = Context.currentContext();
        Assert.state(currentContext != null, "not on a Catalyst thread", new Object[0]);
        return currentContext;
    }

    public <T, U> CompletableFuture<U> send(T t) {
        Assert.notNull(t, "request");
        Context context = getContext();
        CompletableFuture<U> completableFuture = new CompletableFuture<>();
        this.connection.receive(context.serializer().writeObject(t).flip()).whenCompleteAsync((buffer, th) -> {
            if (buffer.readByte() == 1) {
                completableFuture.complete(context.serializer().readObject(buffer));
            } else {
                completableFuture.completeExceptionally((Throwable) context.serializer().readObject(buffer));
            }
            buffer.release();
        }, context.executor());
        if (t instanceof ReferenceCounted) {
            ((ReferenceCounted) t).release();
        }
        return completableFuture;
    }

    private CompletableFuture<Buffer> receive(Buffer buffer) {
        Context context = getContext();
        Object readObject = context.serializer().readObject(buffer);
        buffer.release();
        HandlerHolder handlerHolder = this.handlers.get(readObject.getClass());
        if (handlerHolder == null) {
            return Futures.exceptionalFuture(new TransportException("no handler registered"));
        }
        MessageHandler messageHandler = handlerHolder.handler;
        CompletableFuture<Buffer> completableFuture = new CompletableFuture<>();
        handlerHolder.context.executor().execute(() -> {
            messageHandler.handle(readObject).whenCompleteAsync((obj, th) -> {
                Buffer allocate = context.serializer().allocate();
                if (th == null) {
                    allocate.writeByte(1);
                    context.serializer().writeObject(obj, allocate);
                } else {
                    allocate.writeByte(0);
                    context.serializer().writeObject(th, allocate);
                }
                completableFuture.complete(allocate.flip());
                if (obj instanceof ReferenceCounted) {
                    ((ReferenceCounted) obj).release();
                }
            }, context.executor());
        });
        return completableFuture;
    }

    public <T, U> Connection handler(Class<T> cls, MessageHandler<T, U> messageHandler) {
        Assert.notNull(cls, "type");
        if (messageHandler != null) {
            this.handlers.put(cls, new HandlerHolder(messageHandler, getContext()));
        } else {
            this.handlers.remove(cls);
        }
        return this;
    }

    public Listener<Throwable> exceptionListener(Consumer<Throwable> consumer) {
        return this.exceptionListeners.add((Consumer) Assert.notNull(consumer, "listener"));
    }

    public Listener<Connection> closeListener(Consumer<Connection> consumer) {
        return this.closeListeners.add((Consumer) Assert.notNull(consumer, "listener"));
    }

    public CompletableFuture<Void> close() {
        doClose();
        this.connection.doClose();
        return getContext().execute(() -> {
            return null;
        });
    }

    private void doClose() {
        if (this.connections != null) {
            this.connections.remove(this);
        }
        Iterator it = this.closeListeners.iterator();
        while (it.hasNext()) {
            Consumer consumer = (Consumer) it.next();
            this.context.executor().execute(() -> {
                consumer.accept(this);
            });
        }
    }
}
