package org.onosproject.store.atomix.primitives.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.BoundedThreadPool;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
import org.onlab.util.Tools;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.Timestamp;
import org.onosproject.store.atomix.primitives.impl.MapValue;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/onosproject/store/atomix/primitives/impl/EventuallyConsistentMapImpl.class */
public class EventuallyConsistentMapImpl<K, V> implements EventuallyConsistentMap<K, V> {
    private static final String ERROR_DESTROYED = " map is already destroyed";
    private static final String ERROR_NULL_KEY = "Key cannot be null";
    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
    private static final int WINDOW_SIZE = 5;
    private static final int HIGH_LOAD_THRESHOLD = 2;
    private static final int LOAD_WINDOW = 2;
    private final Map<K, MapValue<V>> items;
    private final ClusterCommunicationService clusterCommunicator;
    private final Serializer serializer;
    private final PersistenceService persistenceService;
    private final BiFunction<K, V, Timestamp> timestampProvider;
    private final MessageSubject bootstrapMessageSubject;
    private final MessageSubject initializeMessageSubject;
    private final MessageSubject updateMessageSubject;
    private final MessageSubject antiEntropyAdvertisementSubject;
    private final MessageSubject updateRequestSubject;
    private final ExecutorService executor;
    private final ScheduledExecutorService backgroundExecutor;
    private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
    private final ExecutorService communicationExecutor;
    private final Map<NodeId, EventuallyConsistentMapImpl<K, V>.EventAccumulator> senderPending;
    private final String mapName;
    private final String destroyedMessage;
    private final boolean lightweightAntiEntropy;
    private final boolean tombstonesDisabled;
    private final boolean persistent;
    private final Supplier<List<NodeId>> peersSupplier;
    private final Supplier<List<NodeId>> bootstrapPeersSupplier;
    private final NodeId localNodeId;
    private long previousTombstonePurgeTime;
    private static final int DEFAULT_MAX_EVENTS = 1000;
    private static final int DEFAULT_MAX_IDLE_MS = 10;
    private static final int DEFAULT_MAX_BATCH_MS = 50;
    private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
    private static final Timer TIMER = new Timer("onos-ecm-sender-events");
    private final Set<EventuallyConsistentMapListener<K, V>> listeners = Sets.newCopyOnWriteArraySet();
    private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
    private final long initialDelaySec = 5;
    private volatile boolean destroyed = false;
    private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);

    /* loaded from: input_file:org/onosproject/store/atomix/primitives/impl/EventuallyConsistentMapImpl$EventAccumulator.class */
    private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
        private final NodeId peer;

        private EventAccumulator(NodeId nodeId) {
            super(EventuallyConsistentMapImpl.TIMER, EventuallyConsistentMapImpl.DEFAULT_MAX_EVENTS, EventuallyConsistentMapImpl.DEFAULT_MAX_BATCH_MS, EventuallyConsistentMapImpl.DEFAULT_MAX_IDLE_MS);
            this.peer = nodeId;
        }

        public void processItems(List<UpdateEntry<K, V>> list) {
            HashMap newHashMap = Maps.newHashMap();
            list.forEach(updateEntry -> {
                newHashMap.compute(updateEntry.key(), (obj, updateEntry) -> {
                    return updateEntry.isNewerThan(updateEntry) ? updateEntry : updateEntry;
                });
            });
            EventuallyConsistentMapImpl.this.communicationExecutor.execute(() -> {
                try {
                    ClusterCommunicationService clusterCommunicationService = EventuallyConsistentMapImpl.this.clusterCommunicator;
                    ImmutableList copyOf = ImmutableList.copyOf(newHashMap.values());
                    MessageSubject messageSubject = EventuallyConsistentMapImpl.this.updateMessageSubject;
                    Serializer serializer = EventuallyConsistentMapImpl.this.serializer;
                    Objects.requireNonNull(serializer);
                    clusterCommunicationService.unicast(copyOf, messageSubject, (v1) -> {
                        return r3.encode(v1);
                    }, this.peer).whenComplete((r6, th) -> {
                        if (th != null) {
                            EventuallyConsistentMapImpl.log.debug("Failed to send to {}", this.peer, th);
                        }
                    });
                } catch (Exception e) {
                    EventuallyConsistentMapImpl.log.warn("Failed to send to {}", this.peer, e);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventuallyConsistentMapImpl(NodeId nodeId, String str, ClusterCommunicationService clusterCommunicationService, KryoNamespace kryoNamespace, BiFunction<K, V, Timestamp> biFunction, BiFunction<K, V, Collection<NodeId>> biFunction2, ExecutorService executorService, ExecutorService executorService2, ScheduledExecutorService scheduledExecutorService, boolean z, long j, TimeUnit timeUnit, boolean z2, boolean z3, PersistenceService persistenceService, Supplier<List<NodeId>> supplier, Supplier<List<NodeId>> supplier2) {
        this.localNodeId = nodeId;
        this.mapName = str;
        this.serializer = createSerializer(kryoNamespace);
        this.persistenceService = persistenceService;
        this.persistent = z3;
        if (z3) {
            this.items = this.persistenceService.persistentMapBuilder().withName(str).withSerializer(this.serializer).build();
        } else {
            this.items = Maps.newConcurrentMap();
        }
        this.senderPending = Maps.newConcurrentMap();
        this.destroyedMessage = str + " map is already destroyed";
        this.clusterCommunicator = clusterCommunicationService;
        this.timestampProvider = biFunction;
        this.peersSupplier = supplier;
        this.bootstrapPeersSupplier = supplier2;
        if (biFunction2 != null) {
            this.peerUpdateFunction = biFunction2.andThen(collection -> {
                Stream stream = ((List) supplier.get()).stream();
                Objects.requireNonNull(collection);
                return (Collection) stream.filter((v1) -> {
                    return r1.contains(v1);
                }).collect(Collectors.toList());
            });
        } else {
            this.peerUpdateFunction = (obj, obj2) -> {
                return (Collection) supplier.get();
            };
        }
        if (executorService != null) {
            this.executor = executorService;
        } else {
            this.executor = Executors.newFixedThreadPool(8, Tools.groupedThreads("onos/ecm", str + "-fg-%d", log));
        }
        if (executorService2 != null) {
            this.communicationExecutor = executorService2;
        } else {
            this.communicationExecutor = BoundedThreadPool.newFixedThreadPool(8, Tools.groupedThreads("onos/ecm", str + "-publish-%d", log));
        }
        if (scheduledExecutorService != null) {
            this.backgroundExecutor = scheduledExecutorService;
        } else {
            this.backgroundExecutor = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/ecm", str + "-bg-%d", log));
        }
        this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement, 5L, j, timeUnit);
        this.bootstrapMessageSubject = new MessageSubject("ecm-" + str + "-bootstrap");
        MessageSubject messageSubject = this.bootstrapMessageSubject;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        Function function = serializer::decode;
        Function function2 = this::handleBootstrap;
        Serializer serializer2 = this.serializer;
        Objects.requireNonNull(serializer2);
        clusterCommunicationService.addSubscriber(messageSubject, function, function2, (v1) -> {
            return r4.encode(v1);
        });
        this.initializeMessageSubject = new MessageSubject("ecm-" + str + "-initialize");
        MessageSubject messageSubject2 = this.initializeMessageSubject;
        Serializer serializer3 = this.serializer;
        Objects.requireNonNull(serializer3);
        Function function3 = serializer3::decode;
        Function function4 = collection2 -> {
            processUpdates(collection2);
            return null;
        };
        Serializer serializer4 = this.serializer;
        Objects.requireNonNull(serializer4);
        clusterCommunicationService.addSubscriber(messageSubject2, function3, function4, (v1) -> {
            return r4.encode(v1);
        }, this.executor);
        this.updateMessageSubject = new MessageSubject("ecm-" + str + "-update");
        MessageSubject messageSubject3 = this.updateMessageSubject;
        Serializer serializer5 = this.serializer;
        Objects.requireNonNull(serializer5);
        clusterCommunicationService.addSubscriber(messageSubject3, serializer5::decode, this::processUpdates, this.executor);
        this.antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + str + "-anti-entropy");
        MessageSubject messageSubject4 = this.antiEntropyAdvertisementSubject;
        Serializer serializer6 = this.serializer;
        Objects.requireNonNull(serializer6);
        Function function5 = serializer6::decode;
        Function function6 = this::handleAntiEntropyAdvertisement;
        Serializer serializer7 = this.serializer;
        Objects.requireNonNull(serializer7);
        clusterCommunicationService.addSubscriber(messageSubject4, function5, function6, (v1) -> {
            return r4.encode(v1);
        }, this.backgroundExecutor);
        this.updateRequestSubject = new MessageSubject("ecm-" + str + "-update-request");
        MessageSubject messageSubject5 = this.updateRequestSubject;
        Serializer serializer8 = this.serializer;
        Objects.requireNonNull(serializer8);
        clusterCommunicationService.addSubscriber(messageSubject5, serializer8::decode, this::handleUpdateRequests, this.backgroundExecutor);
        if (!z) {
            this.previousTombstonePurgeTime = 0L;
            this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones, 5L, j, TimeUnit.SECONDS);
        }
        this.tombstonesDisabled = z;
        this.lightweightAntiEntropy = !z2;
        bootstrap();
    }

    private Serializer createSerializer(KryoNamespace kryoNamespace) {
        return Serializer.using(KryoNamespace.newBuilder().register(kryoNamespace).nextId(600).register(KryoNamespaces.BASIC).register(new Class[]{LogicalTimestamp.class}).register(new Class[]{WallClockTimestamp.class}).register(new Class[]{AntiEntropyAdvertisement.class}).register(new Class[]{AntiEntropyResponse.class}).register(new Class[]{UpdateEntry.class}).register(new Class[]{MapValue.class}).register(new Class[]{MapValue.Digest.class}).register(new Class[]{UpdateRequest.class}).build(name() + "-ecmap"));
    }

    public String name() {
        return this.mapName;
    }

    public int size() {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        return Maps.filterValues(this.items, (v0) -> {
            return v0.isAlive();
        }).size();
    }

    public boolean isEmpty() {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        return size() == 0;
    }

    public boolean containsKey(K k) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        Preconditions.checkNotNull(k, ERROR_NULL_KEY);
        return get(k) != null;
    }

    public boolean containsValue(V v) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        Preconditions.checkNotNull(v, ERROR_NULL_VALUE);
        return this.items.values().stream().filter((v0) -> {
            return v0.isAlive();
        }).anyMatch(mapValue -> {
            return v.equals(mapValue.get());
        });
    }

    public V get(K k) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        Preconditions.checkNotNull(k, ERROR_NULL_KEY);
        MapValue<V> mapValue = this.items.get(k);
        if (mapValue == null || mapValue.isTombstone()) {
            return null;
        }
        return mapValue.get();
    }

    public void put(K k, V v) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        Preconditions.checkNotNull(k, ERROR_NULL_KEY);
        Preconditions.checkNotNull(v, ERROR_NULL_VALUE);
        MapValue<V> mapValue = new MapValue<>(v, this.timestampProvider.apply(k, v));
        if (putInternal(k, mapValue)) {
            notifyPeers(new UpdateEntry<>(k, mapValue), this.peerUpdateFunction.apply(k, v));
            notifyListeners(new EventuallyConsistentMapEvent<>(this.mapName, EventuallyConsistentMapEvent.Type.PUT, k, v));
        }
    }

    public V remove(K k) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        Preconditions.checkNotNull(k, ERROR_NULL_KEY);
        return removeAndNotify(k, null);
    }

    public void remove(K k, V v) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        Preconditions.checkNotNull(k, ERROR_NULL_KEY);
        Preconditions.checkNotNull(v, ERROR_NULL_VALUE);
        removeAndNotify(k, v);
    }

    private V removeAndNotify(K k, V v) {
        Timestamp apply = this.timestampProvider.apply(k, v);
        Optional<MapValue<V>> empty = (this.tombstonesDisabled || apply == null) ? Optional.empty() : Optional.of(MapValue.tombstone(apply));
        MapValue<V> removeInternal = removeInternal(k, Optional.ofNullable(v), empty);
        if (removeInternal != null) {
            notifyPeers(new UpdateEntry<>(k, empty.orElse(null)), this.peerUpdateFunction.apply(k, removeInternal.get()));
            if (removeInternal.isAlive()) {
                notifyListeners(new EventuallyConsistentMapEvent<>(this.mapName, EventuallyConsistentMapEvent.Type.REMOVE, k, removeInternal.get()));
            }
        }
        if (removeInternal != null) {
            return removeInternal.get();
        }
        return null;
    }

    private MapValue<V> removeInternal(K k, Optional<V> optional, Optional<MapValue<V>> optional2) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        Preconditions.checkNotNull(k, ERROR_NULL_KEY);
        Preconditions.checkNotNull(optional, ERROR_NULL_VALUE);
        optional2.ifPresent(mapValue -> {
            Preconditions.checkState(mapValue.isTombstone());
        });
        this.counter.incrementCount();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference atomicReference = new AtomicReference();
        this.items.compute(k, (obj, mapValue2) -> {
            boolean z = true;
            if (optional.isPresent() && mapValue2 != null && mapValue2.isAlive()) {
                z = Objects.equals(optional.get(), mapValue2.get());
            }
            if (mapValue2 == null) {
                log.trace("ECMap Remove: Existing value for key {} is already null", obj);
            }
            if (z) {
                if (mapValue2 == null) {
                    atomicBoolean.set(optional2.isPresent());
                } else {
                    atomicBoolean.set(!optional2.isPresent() || ((MapValue) optional2.get()).isNewerThan(mapValue2));
                }
            }
            if (!atomicBoolean.get()) {
                return mapValue2;
            }
            atomicReference.set(mapValue2);
            return (MapValue) optional2.orElse(null);
        });
        return (MapValue) atomicReference.get();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public V compute(K k, BiFunction<K, V, V> biFunction) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        Preconditions.checkNotNull(k, ERROR_NULL_KEY);
        Preconditions.checkNotNull(biFunction, "Recompute function cannot be null");
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference atomicReference = new AtomicReference();
        MapValue mapValue = (MapValue) this.items.compute(this.serializer.copy(k), (obj, mapValue2) -> {
            atomicReference.set(mapValue2);
            Object apply = biFunction.apply(k, mapValue2 == null ? null : mapValue2.get());
            if (mapValue2 != null && Objects.equals(apply, mapValue2.get())) {
                return mapValue2;
            }
            MapValue mapValue2 = new MapValue(apply, (Timestamp) this.timestampProvider.apply(k, apply));
            if (mapValue2 != null && !mapValue2.isNewerThan(mapValue2)) {
                return mapValue2;
            }
            atomicBoolean.set(true);
            return (MapValue) this.serializer.copy(mapValue2);
        });
        if (atomicBoolean.get()) {
            notifyPeers(new UpdateEntry<>(k, mapValue), (Collection) this.peerUpdateFunction.apply(k, mapValue.get()));
            EventuallyConsistentMapEvent.Type type = mapValue.isTombstone() ? EventuallyConsistentMapEvent.Type.REMOVE : EventuallyConsistentMapEvent.Type.PUT;
            Object obj2 = mapValue.isTombstone() ? atomicReference.get() == null ? null : ((MapValue) atomicReference.get()).get() : mapValue.get();
            if (obj2 != null) {
                notifyListeners(new EventuallyConsistentMapEvent<>(this.mapName, type, k, obj2));
            }
        }
        return (V) mapValue.get();
    }

    public void putAll(Map<? extends K, ? extends V> map) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        map.forEach(this::put);
    }

    public void clear() {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        Maps.filterValues(this.items, (v0) -> {
            return v0.isAlive();
        }).forEach((obj, mapValue) -> {
            remove(obj);
        });
    }

    public Set<K> keySet() {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        return Maps.filterValues(this.items, (v0) -> {
            return v0.isAlive();
        }).keySet();
    }

    public Collection<V> values() {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        return Collections2.transform(Maps.filterValues(this.items, (v0) -> {
            return v0.isAlive();
        }).values(), (v0) -> {
            return v0.get();
        });
    }

    public Set<Map.Entry<K, V>> entrySet() {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        return (Set) Maps.filterValues(this.items, (v0) -> {
            return v0.isAlive();
        }).entrySet().stream().map(entry -> {
            return Pair.of(entry.getKey(), ((MapValue) entry.getValue()).get());
        }).collect(Collectors.toSet());
    }

    private boolean putInternal(K k, MapValue<V> mapValue) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        Preconditions.checkNotNull(k, ERROR_NULL_KEY);
        Preconditions.checkNotNull(mapValue, ERROR_NULL_VALUE);
        Preconditions.checkState(mapValue.isAlive());
        this.counter.incrementCount();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.items.compute(k, (obj, mapValue2) -> {
            if (mapValue2 != null && !mapValue.isNewerThan(mapValue2)) {
                return mapValue2;
            }
            atomicBoolean.set(true);
            return mapValue;
        });
        return atomicBoolean.get();
    }

    public void addListener(EventuallyConsistentMapListener<K, V> eventuallyConsistentMapListener) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        this.listeners.add((EventuallyConsistentMapListener) Preconditions.checkNotNull(eventuallyConsistentMapListener));
        this.items.forEach((obj, mapValue) -> {
            if (mapValue.isAlive()) {
                eventuallyConsistentMapListener.event(new EventuallyConsistentMapEvent(this.mapName, EventuallyConsistentMapEvent.Type.PUT, obj, mapValue.get()));
            }
        });
    }

    public void removeListener(EventuallyConsistentMapListener<K, V> eventuallyConsistentMapListener) {
        Preconditions.checkState(!this.destroyed, this.destroyedMessage);
        this.listeners.remove(Preconditions.checkNotNull(eventuallyConsistentMapListener));
    }

    public CompletableFuture<Void> destroy() {
        this.destroyed = true;
        this.executor.shutdown();
        this.backgroundExecutor.shutdown();
        this.communicationExecutor.shutdown();
        this.listeners.clear();
        this.clusterCommunicator.removeSubscriber(this.bootstrapMessageSubject);
        this.clusterCommunicator.removeSubscriber(this.initializeMessageSubject);
        this.clusterCommunicator.removeSubscriber(this.updateMessageSubject);
        this.clusterCommunicator.removeSubscriber(this.updateRequestSubject);
        this.clusterCommunicator.removeSubscriber(this.antiEntropyAdvertisementSubject);
        return CompletableFuture.completedFuture(null);
    }

    private void notifyListeners(EventuallyConsistentMapEvent<K, V> eventuallyConsistentMapEvent) {
        this.listeners.forEach(eventuallyConsistentMapListener -> {
            eventuallyConsistentMapListener.event(eventuallyConsistentMapEvent);
        });
    }

    private void notifyPeers(UpdateEntry<K, V> updateEntry, Collection<NodeId> collection) {
        queueUpdate(updateEntry, collection);
    }

    private void queueUpdate(UpdateEntry<K, V> updateEntry, Collection<NodeId> collection) {
        if (collection == null) {
            return;
        }
        collection.forEach(nodeId -> {
            this.senderPending.computeIfAbsent(nodeId, nodeId -> {
                return new EventAccumulator(nodeId);
            }).add(updateEntry);
        });
    }

    private boolean underHighLoad() {
        return this.counter.get(2) > 2;
    }

    private void sendAdvertisement() {
        try {
            if (underHighLoad() || this.destroyed) {
                return;
            }
            pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
        } catch (Exception e) {
            log.error("Exception thrown while sending advertisement", e);
        }
    }

    private Optional<NodeId> pickRandomActivePeer() {
        List<NodeId> list = this.peersSupplier.get();
        Collections.shuffle(list);
        return list.stream().findFirst();
    }

    private void sendAdvertisementToPeer(NodeId nodeId) {
        long currentTimeMillis = System.currentTimeMillis();
        AntiEntropyAdvertisement<K> createAdvertisement = createAdvertisement();
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        MessageSubject messageSubject = this.antiEntropyAdvertisementSubject;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        Function function = (v1) -> {
            return r3.encode(v1);
        };
        Serializer serializer2 = this.serializer;
        Objects.requireNonNull(serializer2);
        clusterCommunicationService.sendAndReceive(createAdvertisement, messageSubject, function, serializer2::decode, nodeId).whenComplete((obj, th) -> {
            if (th != null) {
                log.debug("Failed to send anti-entropy advertisement to {}: {}", nodeId, th.getMessage());
            } else if (obj == AntiEntropyResponse.PROCESSED) {
                this.antiEntropyTimes.put(nodeId, Long.valueOf(currentTimeMillis));
            }
        });
    }

    private void sendUpdateRequestToPeer(NodeId nodeId, Set<K> set) {
        UpdateRequest updateRequest = new UpdateRequest(this.localNodeId, set);
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        MessageSubject messageSubject = this.updateRequestSubject;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        clusterCommunicationService.unicast(updateRequest, messageSubject, (v1) -> {
            return r3.encode(v1);
        }, nodeId).whenComplete((r6, th) -> {
            if (th != null) {
                log.debug("Failed to send update request to {}: {}", nodeId, th.getMessage());
            }
        });
    }

    private AntiEntropyAdvertisement<K> createAdvertisement() {
        return new AntiEntropyAdvertisement<>(this.localNodeId, ImmutableMap.copyOf(Maps.transformValues(this.items, (v0) -> {
            return v0.digest();
        })));
    }

    private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> antiEntropyAdvertisement) {
        if (this.destroyed || underHighLoad()) {
            return AntiEntropyResponse.IGNORED;
        }
        try {
            if (log.isTraceEnabled()) {
                log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it", new Object[]{antiEntropyAdvertisement.sender(), this.mapName, Integer.valueOf(antiEntropyAdvertisement.digest().size())});
            }
            antiEntropyCheckLocalItems(antiEntropyAdvertisement).forEach(this::notifyListeners);
            return AntiEntropyResponse.PROCESSED;
        } catch (Exception e) {
            log.warn("Error handling anti-entropy advertisement", e);
            return AntiEntropyResponse.FAILED;
        }
    }

    private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(AntiEntropyAdvertisement<K> antiEntropyAdvertisement) {
        LinkedList newLinkedList = Lists.newLinkedList();
        NodeId sender = antiEntropyAdvertisement.sender();
        ImmutableList of = ImmutableList.of(sender);
        Set<K> hashSet = new HashSet<>();
        HashSet hashSet2 = new HashSet(antiEntropyAdvertisement.digest().keySet());
        this.items.forEach((obj, mapValue) -> {
            hashSet2.remove(obj);
            MapValue.Digest digest = antiEntropyAdvertisement.digest().get(obj);
            if (digest == null || mapValue.isNewerThan(digest.timestamp())) {
                queueUpdate(new UpdateEntry<>(obj, mapValue), of);
                return;
            }
            if (!digest.isNewerThan(mapValue.digest()) || !digest.isTombstone()) {
                if (digest.isNewerThan(mapValue.digest())) {
                    hashSet.add(obj);
                }
            } else {
                MapValue<V> removeInternal = removeInternal(obj, Optional.empty(), Optional.of(MapValue.tombstone(digest.timestamp())));
                if (removeInternal == null || !removeInternal.isAlive()) {
                    return;
                }
                newLinkedList.add(new EventuallyConsistentMapEvent(this.mapName, EventuallyConsistentMapEvent.Type.REMOVE, obj, removeInternal.get()));
            }
        });
        hashSet.addAll(hashSet2);
        sendUpdateRequestToPeer(sender, hashSet);
        return newLinkedList;
    }

    private void handleUpdateRequests(UpdateRequest<K> updateRequest) {
        Set<K> keys = updateRequest.keys();
        ImmutableList of = ImmutableList.of(updateRequest.sender());
        keys.forEach(obj -> {
            queueUpdate(new UpdateEntry<>(obj, this.items.get(obj)), of);
        });
    }

    private void purgeTombstones() {
        long longValue = ((Long) this.peersSupplier.get().stream().map(nodeId -> {
            return this.antiEntropyTimes.getOrDefault(nodeId, 0L);
        }).reduce((v0, v1) -> {
            return Math.min(v0, v1);
        }).orElse(0L)).longValue();
        if (longValue == this.previousTombstonePurgeTime) {
            return;
        }
        List list = (List) this.items.entrySet().stream().filter(entry -> {
            return ((MapValue) entry.getValue()).isTombstone();
        }).filter(entry2 -> {
            return ((MapValue) entry2.getValue()).creationTime() <= longValue;
        }).collect(Collectors.toList());
        this.previousTombstonePurgeTime = longValue;
        list.forEach(entry3 -> {
            this.items.remove(entry3.getKey(), entry3.getValue());
        });
    }

    private void processUpdates(Collection<UpdateEntry<K, V>> collection) {
        if (this.destroyed) {
            return;
        }
        collection.forEach(updateEntry -> {
            Object key = updateEntry.key();
            MapValue<V> copy = updateEntry.value() == null ? null : updateEntry.value().copy();
            if (copy != null && !copy.isTombstone()) {
                if (putInternal(key, copy)) {
                    notifyListeners(new EventuallyConsistentMapEvent(this.mapName, EventuallyConsistentMapEvent.Type.PUT, key, copy.get()));
                }
            } else {
                MapValue removeInternal = removeInternal(key, Optional.empty(), Optional.ofNullable(copy));
                if (removeInternal == null || !removeInternal.isAlive()) {
                    return;
                }
                notifyListeners(new EventuallyConsistentMapEvent(this.mapName, EventuallyConsistentMapEvent.Type.REMOVE, key, removeInternal.get()));
            }
        });
    }

    private void bootstrap() {
        List<NodeId> list = this.bootstrapPeersSupplier.get();
        if (list.isEmpty()) {
            return;
        }
        try {
            requestBootstrapFromPeers(list).get(15000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.debug("Failed to bootstrap ec map {}: {}", this.mapName, ExceptionUtils.getStackTrace(e));
        }
    }

    private CompletableFuture<Void> requestBootstrapFromPeers(List<NodeId> list) {
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        int size = list.size();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        Iterator<NodeId> it = list.iterator();
        while (it.hasNext()) {
            requestBootstrapFromPeer(it.next()).whenComplete((r9, th) -> {
                Throwable th;
                if (th != null) {
                    if (atomicBoolean.get() || atomicInteger.incrementAndGet() != size) {
                        atomicReference.set(th);
                        return;
                    } else {
                        completableFuture.completeExceptionally(th);
                        return;
                    }
                }
                if (atomicBoolean.compareAndSet(false, true)) {
                    completableFuture.complete(null);
                } else {
                    if (atomicInteger.incrementAndGet() != size || (th = (Throwable) atomicReference.get()) == null) {
                        return;
                    }
                    completableFuture.completeExceptionally(th);
                }
            });
        }
        return completableFuture;
    }

    private CompletableFuture<Void> requestBootstrapFromPeer(NodeId nodeId) {
        log.trace("Sending bootstrap request to {} for {}", nodeId, this.mapName);
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        NodeId nodeId2 = this.localNodeId;
        MessageSubject messageSubject = this.bootstrapMessageSubject;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        Function function = (v1) -> {
            return r3.encode(v1);
        };
        Serializer serializer2 = this.serializer;
        Objects.requireNonNull(serializer2);
        return clusterCommunicationService.sendAndReceive(nodeId2, messageSubject, function, serializer2::decode, nodeId).whenComplete((r6, th) -> {
            if (th != null) {
                log.debug("Bootstrap request to {} failed: {}", nodeId, th.getMessage());
            }
        });
    }

    private CompletableFuture<Void> handleBootstrap(NodeId nodeId) {
        log.trace("Received bootstrap request from {} for {}", nodeId, this.bootstrapMessageSubject);
        Function function = list -> {
            log.trace("Initializing {} with {} entries", nodeId, Integer.valueOf(list.size()));
            ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
            ImmutableList copyOf = ImmutableList.copyOf(list);
            MessageSubject messageSubject = this.initializeMessageSubject;
            Serializer serializer = this.serializer;
            Objects.requireNonNull(serializer);
            Function function2 = (v1) -> {
                return r3.encode(v1);
            };
            Serializer serializer2 = this.serializer;
            Objects.requireNonNull(serializer2);
            return clusterCommunicationService.sendAndReceive(copyOf, messageSubject, function2, serializer2::decode, nodeId).whenComplete((r6, th) -> {
                if (th != null) {
                    log.debug("Failed to initialize {}", nodeId, th);
                }
            });
        };
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        for (Map.Entry<K, MapValue<V>> entry : this.items.entrySet()) {
            K key = entry.getKey();
            MapValue<V> value = entry.getValue();
            if (value.isAlive()) {
                newArrayList2.add(new UpdateEntry(key, value));
                if (newArrayList2.size() == DEFAULT_MAX_EVENTS) {
                    newArrayList.add((CompletableFuture) function.apply(newArrayList2));
                    newArrayList2 = new ArrayList();
                }
            }
        }
        if (!newArrayList2.isEmpty()) {
            newArrayList.add((CompletableFuture) function.apply(newArrayList2));
        }
        return CompletableFuture.allOf((CompletableFuture[]) newArrayList.toArray(new CompletableFuture[newArrayList.size()]));
    }
}
