/*
 * Decompiled with CFR 0.152.
 */
package io.janusproject.kernel.services.zeromq;

import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import io.janusproject.services.contextspace.ContextSpaceService;
import io.janusproject.services.contextspace.SpaceRepositoryListener;
import io.janusproject.services.executor.ExecutorService;
import io.janusproject.services.kerneldiscovery.KernelDiscoveryService;
import io.janusproject.services.kerneldiscovery.KernelDiscoveryServiceListener;
import io.janusproject.services.logging.LogService;
import io.janusproject.services.network.AbstractNetworkingExecutionThreadService;
import io.janusproject.services.network.EventDispatch;
import io.janusproject.services.network.EventEnvelope;
import io.janusproject.services.network.EventSerializer;
import io.janusproject.services.network.NetworkService;
import io.janusproject.services.network.NetworkServiceListener;
import io.sarl.lang.core.Event;
import io.sarl.lang.core.Scope;
import io.sarl.lang.core.Space;
import io.sarl.lang.core.SpaceID;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

@Singleton
public class ZeroMQNetworkService
extends AbstractNetworkingExecutionThreadService {
    private static final long POLL_TIMEOUT = 1000L;
    private final Listener serviceListener = new Listener();
    @Inject
    private LogService logger;
    @Inject
    private KernelDiscoveryService kernelService;
    @Inject
    private ContextSpaceService spaceService;
    @Inject
    private ExecutorService executorService;
    @Inject
    private EventSerializer serializer;
    private ZContext context;
    private ZMQ.Socket sendingSocket;
    private Map<URI, ZMQ.Socket> receptionSocketsPerRemoteKernel = new ConcurrentHashMap<URI, ZMQ.Socket>();
    private final Map<SpaceID, NetworkService.NetworkEventReceivingListener> messageRecvListeners = new TreeMap<SpaceID, NetworkService.NetworkEventReceivingListener>();
    private ZMQ.Poller poller;
    private URI uriCandidate;
    private URI validatedURI;
    private Map<SpaceID, BufferedConnection> bufferedConnections = new TreeMap<SpaceID, BufferedConnection>();
    private Map<SpaceID, BufferedSpace> bufferedSpaces = new TreeMap<SpaceID, BufferedSpace>();
    private final List<NetworkServiceListener> listeners = new ArrayList<NetworkServiceListener>();

    @Inject
    public ZeroMQNetworkService(@Named(value="network.pub.uri") URI uri) {
        assert (uri != null) : "Injected URI must be not null nor empty";
        this.uriCandidate = uri;
    }

    @Override
    public Collection<Class<? extends Service>> getServiceDependencies() {
        return Arrays.asList(LogService.class, ExecutorService.class);
    }

    @Override
    public Collection<Class<? extends Service>> getServiceWeakDependencies() {
        return Arrays.asList(KernelDiscoveryService.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public URI getURI() {
        ZeroMQNetworkService zeroMQNetworkService = this;
        synchronized (zeroMQNetworkService) {
            return this.validatedURI;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addNetworkServiceListener(NetworkServiceListener listener) {
        List<NetworkServiceListener> list = this.listeners;
        synchronized (list) {
            this.listeners.add(listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeNetworkServiceListener(NetworkServiceListener listener) {
        List<NetworkServiceListener> list = this.listeners;
        synchronized (list) {
            this.listeners.remove(listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void firePeerConnected(URI peerURI, SpaceID space) {
        NetworkServiceListener[] ilisteners;
        List<NetworkServiceListener> list = this.listeners;
        synchronized (list) {
            ilisteners = new NetworkServiceListener[this.listeners.size()];
            this.listeners.toArray(ilisteners);
        }
        NetworkServiceListener[] networkServiceListenerArray = ilisteners;
        int n = ilisteners.length;
        int n2 = 0;
        while (n2 < n) {
            NetworkServiceListener listener = networkServiceListenerArray[n2];
            listener.peerConnected(peerURI, space);
            ++n2;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void firePeerDisconnected(URI peerURI, SpaceID space) {
        NetworkServiceListener[] ilisteners;
        List<NetworkServiceListener> list = this.listeners;
        synchronized (list) {
            ilisteners = new NetworkServiceListener[this.listeners.size()];
            this.listeners.toArray(ilisteners);
        }
        NetworkServiceListener[] networkServiceListenerArray = ilisteners;
        int n = ilisteners.length;
        int n2 = 0;
        while (n2 < n) {
            NetworkServiceListener listener = networkServiceListenerArray[n2];
            listener.peerDisconnected(peerURI, space);
            ++n2;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void firePeerDisconnected(URI peerURI) {
        NetworkServiceListener[] ilisteners;
        List<NetworkServiceListener> list = this.listeners;
        synchronized (list) {
            ilisteners = new NetworkServiceListener[this.listeners.size()];
            this.listeners.toArray(ilisteners);
        }
        NetworkServiceListener[] networkServiceListenerArray = ilisteners;
        int n = ilisteners.length;
        int n2 = 0;
        while (n2 < n) {
            NetworkServiceListener listener = networkServiceListenerArray[n2];
            listener.peerDisconnected(peerURI);
            ++n2;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void firePeerDiscovered(URI peerURI) {
        NetworkServiceListener[] ilisteners;
        List<NetworkServiceListener> list = this.listeners;
        synchronized (list) {
            ilisteners = new NetworkServiceListener[this.listeners.size()];
            this.listeners.toArray(ilisteners);
        }
        NetworkServiceListener[] networkServiceListenerArray = ilisteners;
        int n = ilisteners.length;
        int n2 = 0;
        while (n2 < n) {
            NetworkServiceListener listener = networkServiceListenerArray[n2];
            listener.peerDiscovered(peerURI);
            ++n2;
        }
    }

    private void send(EventEnvelope envelope) {
        this.sendingSocket.sendMore(ZeroMQNetworkService.buildFilterableHeader(envelope.getContextId()));
        this.sendingSocket.sendMore(Ints.toByteArray(envelope.getSpaceId().length));
        this.sendingSocket.sendMore(envelope.getSpaceId());
        this.sendingSocket.sendMore(Ints.toByteArray(envelope.getScope().length));
        this.sendingSocket.sendMore(envelope.getScope());
        this.sendingSocket.sendMore(Ints.toByteArray(envelope.getCustomHeaders().length));
        this.sendingSocket.sendMore(envelope.getCustomHeaders());
        this.sendingSocket.sendMore(Ints.toByteArray(envelope.getBody().length));
        this.sendingSocket.send(envelope.getBody());
    }

    private static byte[] buildFilterableHeader(byte[] contextID) {
        byte[] header = new byte[4 + contextID.length];
        byte[] length = Ints.toByteArray(contextID.length);
        System.arraycopy(length, 0, header, 0, length.length);
        System.arraycopy(contextID, 0, header, length.length, contextID.length);
        return header;
    }

    @Override
    public synchronized void publish(Scope<?> scope, Event data) throws Exception {
        if (this.validatedURI == null) {
            this.logger.debug("DISCARDED_MESSAGE", data.getSource().getSpaceId(), scope, data);
        } else if (!this.receptionSocketsPerRemoteKernel.isEmpty()) {
            SpaceID spaceID = data.getSource().getSpaceId();
            EventEnvelope env = this.serializer.serialize(new EventDispatch(spaceID, data, scope));
            this.send(env);
            this.logger.debug("PUBLISH_EVENT", spaceID, data);
        }
    }

    private static byte[] readBuffer(ByteBuffer buffer, int size) throws IOException {
        if (buffer.remaining() >= size) {
            byte[] result = new byte[size];
            buffer.get(result);
            return result;
        }
        throw new EOFException();
    }

    private static byte[] readBlock(ByteBuffer buffer) throws IOException {
        int length = Ints.fromByteArray(ZeroMQNetworkService.readBuffer(buffer, 4));
        return ZeroMQNetworkService.readBuffer(buffer, length);
    }

    private static EventEnvelope extractEnvelope(ZMQ.Socket socket) throws IOException {
        byte[] data = socket.recv(1);
        int oldSize = 0;
        while (socket.hasReceiveMore()) {
            byte[] cdata = socket.recv(1);
            oldSize = data.length;
            data = Arrays.copyOf(data, data.length + cdata.length);
            System.arraycopy(cdata, 0, data, oldSize, cdata.length);
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        byte[] contextId = ZeroMQNetworkService.readBlock(buffer);
        assert (contextId != null && contextId.length > 0);
        byte[] spaceId = ZeroMQNetworkService.readBlock(buffer);
        assert (spaceId != null && spaceId.length > 0);
        byte[] scope = ZeroMQNetworkService.readBlock(buffer);
        assert (scope != null && scope.length > 0);
        byte[] headers = ZeroMQNetworkService.readBlock(buffer);
        assert (headers != null && headers.length > 0);
        byte[] body = ZeroMQNetworkService.readBlock(buffer);
        assert (body != null && body.length > 0);
        return new EventEnvelope(contextId, spaceId, scope, headers, body);
    }

    @Override
    public synchronized void connectToRemoteSpaces(URI peerUri, SpaceID space, NetworkService.NetworkEventReceivingListener listener) throws Exception {
        if (this.validatedURI == null) {
            assert (this.bufferedConnections != null);
            this.bufferedConnections.put(space, new BufferedConnection(peerUri, space, listener));
        } else {
            ZMQ.Socket receptionSocket = this.receptionSocketsPerRemoteKernel.get(peerUri);
            if (receptionSocket == null) {
                this.logger.debug("PEER_CONNECTION", peerUri, space);
                receptionSocket = this.context.createSocket(2);
                assert (receptionSocket != null);
                this.receptionSocketsPerRemoteKernel.put(peerUri, receptionSocket);
                receptionSocket.connect(peerUri.toString());
                this.poller.register(receptionSocket, 1);
                this.logger.debug("PEER_CONNECTED", peerUri);
            }
            assert (receptionSocket != null);
            NetworkService.NetworkEventReceivingListener old = this.messageRecvListeners.get(space);
            if (old == null) {
                assert (listener != null);
                this.messageRecvListeners.put(space, listener);
            }
            byte[] header = ZeroMQNetworkService.buildFilterableHeader(this.serializer.serializeContextID(space.getContextID()));
            receptionSocket.subscribe(header);
            this.logger.debug("PEER_SUBSCRIPTION", peerUri, space);
        }
    }

    @Override
    public synchronized void disconnectFromRemoteSpace(URI peer, SpaceID space) throws Exception {
        ZMQ.Socket socket = this.receptionSocketsPerRemoteKernel.get(peer);
        if (socket != null) {
            this.logger.debug("PEER_UNSUBSCRIPTION ", peer, space);
            byte[] header = ZeroMQNetworkService.buildFilterableHeader(this.serializer.serializeContextID(space.getContextID()));
            socket.unsubscribe(header);
        }
    }

    @Override
    public synchronized void disconnectPeer(URI peer) throws Exception {
        ZMQ.Socket socket = this.receptionSocketsPerRemoteKernel.remove(peer);
        if (socket != null) {
            this.logger.debug("PEER_DISCONNECTION", peer);
            this.poller.unregister(socket);
            socket.close();
            this.logger.debug("PEER_DISCONNECTED", peer);
        }
    }

    protected synchronized void receive(EventEnvelope env) throws Exception {
        this.logger.debug("ENVELOPE_RECEIVED", this.validatedURI, env);
        EventDispatch dispatch = this.serializer.deserialize(env);
        this.logger.debug("DISPATCH_RECEIVED", dispatch);
        SpaceID spaceID = dispatch.getSpaceID();
        NetworkService.NetworkEventReceivingListener space = this.messageRecvListeners.get(spaceID);
        if (space != null) {
            this.executorService.submit(new AsyncRunner(space, spaceID, dispatch.getScope(), dispatch.getEvent()));
        } else {
            this.logger.debug("UNKNOWN_SPACE", spaceID, dispatch.getEvent());
        }
    }

    @Override
    protected void run() throws Exception {
        while (this.isRunning()) {
            try {
                int signaled;
                if (this.poller.getSize() > 0 && (signaled = this.poller.poll(1000L)) > 0) {
                    int i = 0;
                    while (i < this.poller.getSize()) {
                        if (this.poller.pollin(i)) {
                            this.logger.debug("POLLING", new Integer(i));
                            EventEnvelope ev = ZeroMQNetworkService.extractEnvelope(this.poller.getSocket(i));
                            assert (ev != null);
                            try {
                                this.receive(ev);
                            }
                            catch (Throwable e) {
                                this.logger.log(Level.FINE, ZeroMQNetworkService.class, "CANNOT_RECEIVE_EVENT", e);
                            }
                        } else if (this.poller.pollerr(i)) {
                            final int poolerIdx = i;
                            this.logger.warning("POLLING_ERROR", new LogService.LogParam(){

                                @Override
                                public String toString() {
                                    return ZeroMQNetworkService.this.poller.getSocket(poolerIdx).toString();
                                }
                            });
                        }
                        ++i;
                    }
                }
            }
            catch (Throwable e) {
                this.logger.log(Level.SEVERE, ZeroMQNetworkService.class, "UNEXPECTED_EXCEPTION", e);
            }
            Thread.yield();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void startUp() throws Exception {
        Map<SpaceID, BufferedConnection> connections;
        ZeroMQNetworkService zeroMQNetworkService = this;
        synchronized (zeroMQNetworkService) {
            super.startUp();
            this.context = new ZContext();
            this.sendingSocket = this.context.createSocket(1);
            String strUri = this.uriCandidate.toString();
            if (this.uriCandidate.getPort() == -1) {
                int port = this.sendingSocket.bindToRandomPort(strUri);
                this.validatedURI = port != -1 && this.uriCandidate.getPort() == -1 ? new URI(this.uriCandidate.getScheme(), this.uriCandidate.getUserInfo(), this.uriCandidate.getHost(), port, this.uriCandidate.getPath(), this.uriCandidate.getQuery(), this.uriCandidate.getFragment()) : this.uriCandidate;
            } else {
                this.sendingSocket.bind(strUri);
                this.validatedURI = this.uriCandidate;
            }
            System.setProperty("network.pub.uri", this.validatedURI.toString());
            this.logger.debug("ZEROMQ_BINDED", this.validatedURI);
            this.uriCandidate = null;
            connections = this.bufferedConnections;
            this.bufferedConnections = null;
            this.poller = new ZMQ.Poller(1);
            this.kernelService.addKernelDiscoveryServiceListener(this.serviceListener);
            this.spaceService.addSpaceRepositoryListener(this.serviceListener);
        }
        for (BufferedConnection t : connections.values()) {
            this.connectToRemoteSpaces(t.getPeerURI(), t.getSpaceID(), t.getListener());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void shutDown() throws Exception {
        ZeroMQNetworkService zeroMQNetworkService = this;
        synchronized (zeroMQNetworkService) {
            this.kernelService.removeKernelDiscoveryServiceListener(this.serviceListener);
            this.spaceService.removeSpaceRepositoryListener(this.serviceListener);
            this.context.destroy();
        }
        this.logger.fineInfo("ZEROMQ_SHUTDOWN", new Object[0]);
    }

    private class AsyncRunner
    implements Runnable {
        private final NetworkService.NetworkEventReceivingListener space;
        private final SpaceID spaceID;
        private final Scope<?> scope;
        private final Event event;

        AsyncRunner(NetworkService.NetworkEventReceivingListener space, SpaceID spaceID, Scope<?> scope, Event event) {
            this.space = space;
            this.spaceID = spaceID;
            this.scope = scope;
            this.event = event;
        }

        @Override
        public void run() {
            this.space.eventReceived(this.spaceID, this.scope, this.event);
        }
    }

    private static class BufferedConnection {
        private final URI peerURI;
        private final SpaceID spaceID;
        private final NetworkService.NetworkEventReceivingListener listener;

        BufferedConnection(URI peerURI, SpaceID spaceID, NetworkService.NetworkEventReceivingListener listener) {
            this.peerURI = peerURI;
            this.spaceID = spaceID;
            this.listener = listener;
        }

        public URI getPeerURI() {
            return this.peerURI;
        }

        public SpaceID getSpaceID() {
            return this.spaceID;
        }

        public NetworkService.NetworkEventReceivingListener getListener() {
            return this.listener;
        }
    }

    private static class BufferedSpace {
        private final SpaceID spaceID;
        private final NetworkService.NetworkEventReceivingListener listener;

        BufferedSpace(SpaceID spaceID, NetworkService.NetworkEventReceivingListener listener) {
            this.spaceID = spaceID;
            this.listener = listener;
        }

        public SpaceID getSpaceID() {
            return this.spaceID;
        }

        public NetworkService.NetworkEventReceivingListener getListener() {
            return this.listener;
        }
    }

    private class Listener
    implements SpaceRepositoryListener,
    KernelDiscoveryServiceListener {
        Listener() {
        }

        private void magicConnect(URI peer, Collection<SpaceID> spaceIDs, Collection<BufferedSpace> ibufferedSpaces, Space space) {
            if (space != null) {
                try {
                    ZeroMQNetworkService.this.connectToRemoteSpaces(peer, space.getSpaceID(), (NetworkService.NetworkEventReceivingListener)((Object)space));
                }
                catch (Exception e) {
                    ZeroMQNetworkService.this.logger.error(ZeroMQNetworkService.class, "UNEXPECTED_EXCEPTION", e);
                }
            }
            for (SpaceID sid : spaceIDs) {
                try {
                    ZeroMQNetworkService.this.connectToRemoteSpaces(peer, sid, null);
                }
                catch (Exception e) {
                    ZeroMQNetworkService.this.logger.error(ZeroMQNetworkService.class, "UNEXPECTED_EXCEPTION", e);
                }
            }
            for (BufferedSpace sp : ibufferedSpaces) {
                try {
                    ZeroMQNetworkService.this.connectToRemoteSpaces(peer, sp.getSpaceID(), sp.getListener());
                }
                catch (Exception e) {
                    ZeroMQNetworkService.this.logger.error(ZeroMQNetworkService.class, "UNEXPECTED_EXCEPTION", e);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void spaceCreated(Space space, boolean isLocalCreation) {
            ZeroMQNetworkService zeroMQNetworkService = ZeroMQNetworkService.this;
            synchronized (zeroMQNetworkService) {
                URI localUri = ZeroMQNetworkService.this.getURI();
                try {
                    boolean isUsed = false;
                    ArrayList<SpaceID> spaceIDs = new ArrayList<SpaceID>(ZeroMQNetworkService.this.messageRecvListeners.keySet());
                    ArrayList<BufferedSpace> spaces = new ArrayList<BufferedSpace>(ZeroMQNetworkService.this.bufferedSpaces.values());
                    for (URI peer : ZeroMQNetworkService.this.kernelService.getKernels()) {
                        if (peer.equals(localUri)) continue;
                        if (space instanceof NetworkService.NetworkEventReceivingListener) {
                            this.magicConnect(peer, spaceIDs, spaces, space);
                            isUsed = true;
                            continue;
                        }
                        ZeroMQNetworkService.this.logger.error(ZeroMQNetworkService.class, "NOT_DISTRIBUTABLE_SPACE", space);
                    }
                    if (!isUsed) {
                        if (space instanceof NetworkService.NetworkEventReceivingListener) {
                            ZeroMQNetworkService.this.bufferedSpaces.put(space.getSpaceID(), new BufferedSpace(space.getSpaceID(), (NetworkService.NetworkEventReceivingListener)((Object)space)));
                        } else {
                            ZeroMQNetworkService.this.logger.error(ZeroMQNetworkService.class, "NOT_DISTRIBUTABLE_SPACE", space);
                        }
                    } else {
                        ZeroMQNetworkService.this.bufferedSpaces.clear();
                    }
                }
                catch (Exception e) {
                    ZeroMQNetworkService.this.logger.error(ZeroMQNetworkService.class, "UNEXPECTED_EXCEPTION", e);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void spaceDestroyed(Space space, boolean isLocalDestruction) {
            ZeroMQNetworkService zeroMQNetworkService = ZeroMQNetworkService.this;
            synchronized (zeroMQNetworkService) {
                URI localUri = ZeroMQNetworkService.this.getURI();
                try {
                    for (URI peer : ZeroMQNetworkService.this.kernelService.getKernels()) {
                        if (peer.equals(localUri)) continue;
                        ZeroMQNetworkService.this.disconnectFromRemoteSpace(peer, space.getSpaceID());
                    }
                    ZeroMQNetworkService.this.messageRecvListeners.remove(space.getSpaceID());
                    if (ZeroMQNetworkService.this.bufferedConnections != null) {
                        ZeroMQNetworkService.this.bufferedConnections.remove(space.getSpaceID());
                    }
                    ZeroMQNetworkService.this.bufferedSpaces.remove(space.getSpaceID());
                }
                catch (Exception e) {
                    ZeroMQNetworkService.this.logger.error(ZeroMQNetworkService.class, "UNEXPECTED_EXCEPTION", e);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void kernelDiscovered(URI peerURI) {
            ZeroMQNetworkService zeroMQNetworkService = ZeroMQNetworkService.this;
            synchronized (zeroMQNetworkService) {
                URI localUri = ZeroMQNetworkService.this.getURI();
                ArrayList<SpaceID> spaceIDs = new ArrayList<SpaceID>(ZeroMQNetworkService.this.messageRecvListeners.keySet());
                ArrayList<BufferedSpace> spaces = new ArrayList<BufferedSpace>(ZeroMQNetworkService.this.bufferedSpaces.values());
                if (!spaceIDs.isEmpty() || !spaces.isEmpty()) {
                    boolean cleanBuffers = false;
                    for (URI peer : ZeroMQNetworkService.this.kernelService.getKernels()) {
                        if (peer.equals(localUri)) continue;
                        this.magicConnect(peer, spaceIDs, spaces, null);
                        cleanBuffers = true;
                    }
                    if (cleanBuffers) {
                        ZeroMQNetworkService.this.bufferedSpaces.clear();
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void kernelDisconnected(URI peerURI) {
            ZeroMQNetworkService zeroMQNetworkService = ZeroMQNetworkService.this;
            synchronized (zeroMQNetworkService) {
                try {
                    URI localUri = ZeroMQNetworkService.this.getURI();
                    if (!peerURI.equals(localUri)) {
                        ZeroMQNetworkService.this.disconnectPeer(peerURI);
                    }
                }
                catch (Exception e) {
                    ZeroMQNetworkService.this.logger.error(ZeroMQNetworkService.class, "UNEXPECTED_EXCEPTION", e);
                }
            }
        }
    }
}

