package org.jppf.client.balancer;

import java.io.IOException;
import java.io.NotSerializableException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.jppf.JPPFException;
import org.jppf.client.JPPFClientConnection;
import org.jppf.client.JPPFClientConnectionStatus;
import org.jppf.client.JPPFJob;
import org.jppf.client.event.ClientConnectionStatusEvent;
import org.jppf.client.event.ClientConnectionStatusListener;
import org.jppf.load.balancer.BundlerHelper;
import org.jppf.node.protocol.BundleParameter;
import org.jppf.node.protocol.Task;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.serialization.ObjectSerializer;
import org.jppf.utils.ExceptionUtils;
import org.jppf.utils.LoggingUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/jppf-client-6.1-alpha.jar:org/jppf/client/balancer/ChannelWrapperRemoteAsync.class */
public class ChannelWrapperRemoteAsync extends AbstractChannelWrapperRemote {
    private static Logger log = LoggerFactory.getLogger((Class<?>) ChannelWrapperRemoteAsync.class);
    private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
    private final BlockingQueue<ClientTaskBundle> bundleQueue;
    private final Map<Long, RemoteResponse> responseMap;
    private boolean initDone;
    private final List<Future<?>> futures;
    private final Object statusLock;
    private final StatusListener listener;

    /* loaded from: input_file:WEB-INF/lib/jppf-client-6.1-alpha.jar:org/jppf/client/balancer/ChannelWrapperRemoteAsync$RemoteReceiver.class */
    private class RemoteReceiver implements Runnable {
        private Logger thisLog;
        private boolean thisDebugEnabled;

        private RemoteReceiver() {
            this.thisLog = LoggerFactory.getLogger((Class<?>) RemoteReceiver.class);
            this.thisDebugEnabled = LoggingUtils.isDebugEnabled(this.thisLog);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!ChannelWrapperRemoteAsync.this.channel.isClosed()) {
                ClientTaskBundle clientTaskBundle = null;
                boolean z = false;
                try {
                    try {
                        ChannelWrapperRemoteAsync.this.awaitStatus();
                        TaskBundle receiveHeader = ChannelWrapperRemoteAsync.this.channel.receiveHeader(null, null);
                        if (this.thisDebugEnabled) {
                            this.thisLog.debug("received bundle {}", receiveHeader);
                        }
                        long longValue = ((Long) receiveHeader.getParameter(BundleParameter.CLIENT_BUNDLE_ID)).longValue();
                        RemoteResponse remoteResponse = (RemoteResponse) ChannelWrapperRemoteAsync.this.responseMap.remove(Long.valueOf(longValue));
                        if (remoteResponse == null) {
                            this.thisLog.debug("response object no longer in queue for bundleId = {}", Long.valueOf(longValue));
                            if (0 != 0) {
                                ChannelWrapperRemoteAsync.this.handleBundleComplete(null, null);
                            }
                        } else {
                            synchronized (remoteResponse) {
                                clientTaskBundle = remoteResponse.clientBundle;
                                List receiveTasks = ChannelWrapperRemoteAsync.this.channel.receiveTasks(receiveHeader, remoteResponse.ser, remoteResponse.cl);
                                if (this.thisDebugEnabled) {
                                    this.thisLog.debug("received {} tasks for {}", Integer.valueOf(receiveTasks.size()), clientTaskBundle);
                                }
                                remoteResponse.handleResults(receiveTasks);
                                if (remoteResponse.currentCount < remoteResponse.taskCount) {
                                    ChannelWrapperRemoteAsync.this.responseMap.put(Long.valueOf(longValue), remoteResponse);
                                } else {
                                    z = true;
                                    BundlerHelper.updateBundler(ChannelWrapperRemoteAsync.this.bundler, receiveTasks.size(), remoteResponse.elapsed);
                                    ChannelWrapperRemoteAsync.this.getLoadBalancerPersistenceManager().storeBundler(ChannelWrapperRemoteAsync.this.channelID, ChannelWrapperRemoteAsync.this.bundler, ChannelWrapperRemoteAsync.this.bundlerAlgorithm);
                                }
                            }
                            if (z) {
                                ChannelWrapperRemoteAsync.this.handleBundleComplete(clientTaskBundle, null);
                            }
                        }
                    } catch (Throwable th) {
                        Exception handleThrowable = ChannelWrapperRemoteAsync.this.handleThrowable(clientTaskBundle, th, false);
                        if (0 != 0) {
                            ChannelWrapperRemoteAsync.this.handleBundleComplete(clientTaskBundle, handleThrowable);
                        }
                    }
                } catch (Throwable th2) {
                    if (0 != 0) {
                        ChannelWrapperRemoteAsync.this.handleBundleComplete(clientTaskBundle, null);
                    }
                    throw th2;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/jppf-client-6.1-alpha.jar:org/jppf/client/balancer/ChannelWrapperRemoteAsync$RemoteResponse.class */
    public static class RemoteResponse {
        final ClientTaskBundle clientBundle;
        final int taskCount;
        final ClassLoader cl;
        final ObjectSerializer ser;
        final long start;
        long elapsed;
        int currentCount;

        public RemoteResponse(ClientTaskBundle clientTaskBundle, int i, ClassLoader classLoader, ObjectSerializer objectSerializer, long j) {
            this.clientBundle = clientTaskBundle;
            this.taskCount = clientTaskBundle.getTasksL().size();
            this.currentCount = i;
            this.cl = classLoader;
            this.ser = objectSerializer;
            this.start = j;
        }

        void handleResults(List<Task<?>> list) {
            this.elapsed = System.nanoTime() - this.start;
            int size = list.size();
            this.currentCount += size;
            if (ChannelWrapperRemoteAsync.debugEnabled) {
                ChannelWrapperRemoteAsync.log.debug("received {} tasks from server{}", Integer.valueOf(size), size > 0 ? ", first position=" + list.get(0).getPosition() : "");
            }
            this.clientBundle.resultsReceived(list);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/jppf-client-6.1-alpha.jar:org/jppf/client/balancer/ChannelWrapperRemoteAsync$RemoteSender.class */
    private class RemoteSender implements Runnable {
        private Logger thisLog;
        private boolean thisDebugEnabled;

        private RemoteSender() {
            this.thisLog = LoggerFactory.getLogger((Class<?>) RemoteSender.class);
            this.thisDebugEnabled = LoggingUtils.isDebugEnabled(this.thisLog);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!ChannelWrapperRemoteAsync.this.channel.isClosed()) {
                ClientTaskBundle clientTaskBundle = null;
                try {
                    ChannelWrapperRemoteAsync.this.awaitStatus();
                    clientTaskBundle = (ClientTaskBundle) ChannelWrapperRemoteAsync.this.bundleQueue.take();
                    List<Task<?>> tasksL = clientTaskBundle.getTasksL();
                    JPPFJob createNewJob = AbstractChannelWrapperRemote.createNewJob(clientTaskBundle, tasksL);
                    if (this.thisDebugEnabled) {
                        this.thisLog.debug("{} executing {} tasks of job {} with bundleId = {}", ChannelWrapperRemoteAsync.this, Integer.valueOf(tasksL.size()), createNewJob, Long.valueOf(clientTaskBundle.getBundleId()));
                    }
                    Collection<ClassLoader> registerClassLoaders = ChannelWrapperRemoteAsync.this.registerClassLoaders(createNewJob);
                    TaskBundle createBundle = AbstractChannelWrapperRemote.createBundle(createNewJob, clientTaskBundle.getBundleId());
                    createBundle.setUuid(ChannelWrapperRemoteAsync.this.uuid);
                    createBundle.setInitialTaskCount(clientTaskBundle.getClientJob().initialTaskCount);
                    ClassLoader next = registerClassLoaders.isEmpty() ? null : registerClassLoaders.iterator().next();
                    ObjectSerializer serializer = ChannelWrapperRemoteAsync.this.channel.makeHelper(next).getSerializer();
                    RemoteResponse remoteResponse = new RemoteResponse(clientTaskBundle, 0, next, serializer, System.nanoTime());
                    synchronized (remoteResponse) {
                        if (remoteResponse.currentCount < remoteResponse.taskCount) {
                            ChannelWrapperRemoteAsync.this.responseMap.put(Long.valueOf(clientTaskBundle.getBundleId()), remoteResponse);
                        }
                        if (this.thisDebugEnabled) {
                            this.thisLog.debug("{} sending {}", ChannelWrapperRemoteAsync.this, clientTaskBundle);
                        }
                        List sendTasks = ChannelWrapperRemoteAsync.this.channel.sendTasks(serializer, next, createBundle, createNewJob);
                        clientTaskBundle.jobDispatched(ChannelWrapperRemoteAsync.this);
                        if (!sendTasks.isEmpty()) {
                            if (this.thisDebugEnabled) {
                                this.thisLog.debug("got {} non-serializable tasks for {}", Integer.valueOf(sendTasks.size()), clientTaskBundle);
                            }
                            remoteResponse.currentCount = sendTasks.size();
                            clientTaskBundle.resultsReceived((List<Task<?>>) sendTasks);
                        }
                        if (remoteResponse.currentCount >= remoteResponse.taskCount) {
                            ChannelWrapperRemoteAsync.this.handleBundleComplete(clientTaskBundle, null);
                        }
                    }
                } catch (Throwable th) {
                    ChannelWrapperRemoteAsync.this.handleThrowable(clientTaskBundle, th, true);
                }
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/jppf-client-6.1-alpha.jar:org/jppf/client/balancer/ChannelWrapperRemoteAsync$StatusListener.class */
    private class StatusListener implements ClientConnectionStatusListener {
        private StatusListener() {
        }

        @Override // org.jppf.client.event.ClientConnectionStatusListener
        public void statusChanged(ClientConnectionStatusEvent clientConnectionStatusEvent) {
            synchronized (ChannelWrapperRemoteAsync.this.statusLock) {
                ChannelWrapperRemoteAsync.this.statusLock.notifyAll();
            }
        }
    }

    public ChannelWrapperRemoteAsync(JPPFClientConnection jPPFClientConnection) {
        super(jPPFClientConnection);
        this.bundleQueue = new LinkedBlockingQueue();
        this.responseMap = new ConcurrentHashMap();
        this.futures = new ArrayList();
        this.statusLock = new Object();
        this.listener = new StatusListener();
        jPPFClientConnection.addClientConnectionStatusListener(this.listener);
    }

    @Override // org.jppf.client.balancer.AbstractChannelWrapperRemote, org.jppf.client.balancer.ChannelWrapper
    public void initChannelID() {
        super.initChannelID();
        if (this.initDone) {
            return;
        }
        this.initDone = true;
        ThreadPoolExecutor executor = this.channel.getClient().getExecutor();
        this.futures.add(executor.submit(new RemoteSender()));
        this.futures.add(executor.submit(new RemoteReceiver()));
    }

    @Override // org.jppf.execute.ExecutorChannel
    public Future<?> submit(ClientTaskBundle clientTaskBundle) {
        if (this.channel.isClosed()) {
            return null;
        }
        this.jobCount.incrementAndGet();
        if (debugEnabled) {
            log.debug("submitting {} to {}", clientTaskBundle, this);
        }
        if (getCurrentNbJobs() >= getMaxJobs()) {
            setStatus(JPPFClientConnectionStatus.EXECUTING);
        }
        this.bundleQueue.offer(clientTaskBundle);
        if (!debugEnabled) {
            return null;
        }
        log.debug("submitted {} to {}", clientTaskBundle, this);
        return null;
    }

    @Override // org.jppf.client.balancer.AbstractChannelWrapperRemote, org.jppf.execute.ExecutorChannel
    public boolean isLocal() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Exception handleThrowable(ClientTaskBundle clientTaskBundle, Throwable th, boolean z) {
        if (debugEnabled) {
            log.debug("handling throwable for {}:\nchannel = {}", clientTaskBundle, this, th);
        }
        boolean isClosed = this.channel.isClosed();
        if (debugEnabled) {
            log.debug("channelClosed={}, resetting={}", Boolean.valueOf(isClosed), Boolean.valueOf(this.resetting));
        }
        if (isClosed && !this.resetting) {
            return null;
        }
        if (!isClosed) {
            String str = clientTaskBundle == null ? "" : " while handling job " + clientTaskBundle;
            if (debugEnabled) {
                log.debug("Throwable was raised{} on channel {}\n{}", str, this, ExceptionUtils.getStackTrace(th));
            } else {
                log.warn("Throwable was raised{} on channel {} : {}", str, this, ExceptionUtils.getMessage(th));
            }
        }
        Exception jPPFException = th == null ? null : th instanceof Exception ? (Exception) th : new JPPFException(th);
        try {
            if (!(th instanceof NotSerializableException)) {
                if (clientTaskBundle != null) {
                    if (debugEnabled) {
                        log.debug("resubmitting {}", clientTaskBundle);
                    }
                    resubmitBundle(clientTaskBundle, jPPFException);
                }
                if (debugEnabled) {
                    log.debug("{} resubmitting all queued jobs", this);
                }
                HashSet hashSet = new HashSet(this.jobCount.get());
                HashSet hashSet2 = new HashSet(this.jobCount.get());
                this.bundleQueue.drainTo(hashSet);
                hashSet.forEach(clientTaskBundle2 -> {
                    hashSet2.add(Long.valueOf(clientTaskBundle2.getBundleId()));
                    resubmitBundle(clientTaskBundle2, jPPFException);
                });
                this.jobCount.set(0);
                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(this.responseMap);
                this.responseMap.clear();
                concurrentHashMap.forEach((l, remoteResponse) -> {
                    if (hashSet2.contains(l)) {
                        return;
                    }
                    resubmitBundle(remoteResponse.clientBundle, jPPFException);
                });
                reconnect();
            } else if (clientTaskBundle != null) {
                clientTaskBundle.resultsReceived(th);
            }
            return jPPFException;
        } finally {
            if (clientTaskBundle != null) {
                if (this.jobCount.get() > 0) {
                    this.jobCount.decrementAndGet();
                }
                if (getStatus() == JPPFClientConnectionStatus.EXECUTING && getCurrentNbJobs() < getMaxJobs()) {
                    setStatus(JPPFClientConnectionStatus.ACTIVE);
                }
            }
        }
    }

    private void resubmitBundle(ClientTaskBundle clientTaskBundle, Exception exc) {
        if (debugEnabled) {
            log.debug("resubmitting {} with exception {}", clientTaskBundle, exc);
        }
        clientTaskBundle.resubmit();
        clientTaskBundle.taskCompleted(exc);
        clientTaskBundle.getClientJob().removeChannel(this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleBundleComplete(ClientTaskBundle clientTaskBundle, Exception exc) {
        try {
            try {
                boolean isClosed = this.channel.isClosed();
                if (debugEnabled) {
                    log.debug("channelClosed={}, resetting={}, bundle={}, exception={}", Boolean.valueOf(isClosed), Boolean.valueOf(this.resetting), clientTaskBundle, exc);
                }
                if ((!isClosed || this.resetting) && clientTaskBundle != null) {
                    clientTaskBundle.taskCompleted(exc instanceof IOException ? null : exc);
                }
                if (clientTaskBundle != null) {
                    clientTaskBundle.getClientJob().removeChannel(this);
                }
            } catch (Exception e) {
                log.error(e.getMessage(), (Throwable) e);
                this.jobCount.decrementAndGet();
                if (getStatus() != JPPFClientConnectionStatus.EXECUTING || getCurrentNbJobs() >= getMaxJobs()) {
                    return;
                }
                setStatus(JPPFClientConnectionStatus.ACTIVE);
            }
        } finally {
            this.jobCount.decrementAndGet();
            if (getStatus() == JPPFClientConnectionStatus.EXECUTING && getCurrentNbJobs() < getMaxJobs()) {
                setStatus(JPPFClientConnectionStatus.ACTIVE);
            }
        }
    }

    @Override // org.jppf.client.balancer.ChannelWrapper, java.lang.AutoCloseable
    public void close() {
        if (debugEnabled) {
            log.debug("closing {}, resetting={}", this, Boolean.valueOf(this.resetting));
        }
        this.channel.removeClientConnectionStatusListener(this.listener);
        synchronized (this.statusLock) {
            this.statusLock.notifyAll();
        }
        super.close();
        this.futures.forEach(future -> {
            future.cancel(true);
        });
        this.futures.clear();
        this.initDone = false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void awaitStatus() throws Exception {
        synchronized (this.statusLock) {
            while (true) {
                JPPFClientConnectionStatus status = getStatus();
                if (status.isWorkingStatus() || status.isTerminatedStatus()) {
                    break;
                } else {
                    this.statusLock.wait();
                }
            }
        }
    }

    @Override // org.jppf.execute.ExecutorChannel
    public boolean isAsynchronous() {
        return true;
    }

    @Override // org.jppf.execute.ExecutorChannel
    public int getMaxJobs() {
        return this.channel.getPool().getMaxJobs();
    }

    @Override // org.jppf.client.balancer.AbstractChannelWrapperRemote
    public String toString() {
        StringBuilder append = new StringBuilder(getClass().getSimpleName()).append('[').append("bundleQueue=").append(this.bundleQueue.size()).append(", responseMap=").append(this.responseMap.size()).append(", joBCount=").append(getCurrentNbJobs()).append(", resetting=").append(this.resetting).append(", bundlerAlgorithm=").append(this.bundlerAlgorithm).append(", channel=");
        try {
            append.append(this.channel);
        } catch (Exception e) {
            append.append(ExceptionUtils.getMessage(e));
        }
        return append.append(']').toString();
    }
}
