package org.factcast.core.subscription;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import lombok.NonNull;
import org.factcast.core.Fact;
import org.factcast.core.store.FactStore;
import org.factcast.core.subscription.observer.FactObserver;
import org.factcast.core.util.ExceptionHelper;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/factcast/core/subscription/ReconnectingFactSubscriptionWrapper.class */
public class ReconnectingFactSubscriptionWrapper implements Subscription {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReconnectingFactSubscriptionWrapper.class);

    @NonNull
    private final FactStore store;

    @NonNull
    private final SubscriptionRequestTO originalRequest;

    @NonNull
    private final FactObserver originalObserver;

    @NonNull
    private final FactObserver observer;

    @NonNull
    private final AtomicReference<Subscription> currentSubscription = new AtomicReference<>();
    private final AtomicReference<UUID> factIdSeen = new AtomicReference<>();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ExecutorService es = Executors.newCachedThreadPool(new ThreadFactory() { // from class: org.factcast.core.subscription.ReconnectingFactSubscriptionWrapper.1
        private final AtomicLong threadCount = new AtomicLong(0);

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(@NonNull @NotNull Runnable runnable) {
            Objects.requireNonNull(runnable, "r is marked non-null but is null");
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("factcast-recon-sub-wrapper-" + this.threadCount.incrementAndGet());
            return thread;
        }
    });

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.closed.set(true);
        Subscription subscription = this.currentSubscription.get();
        if (subscription != null) {
            subscription.close();
        }
        this.currentSubscription.set(null);
    }

    @Override // org.factcast.core.subscription.Subscription
    public Subscription awaitCatchup() throws SubscriptionCancelledException {
        while (true) {
            assertSubscriptionStateNotClosed();
            Subscription subscription = this.currentSubscription.get();
            if (subscription != null) {
                subscription.awaitCatchup();
                return this;
            }
            sleep();
        }
    }

    private void sleep() {
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e) {
        }
    }

    @Override // org.factcast.core.subscription.Subscription
    public Subscription awaitCatchup(long j) throws SubscriptionCancelledException, TimeoutException {
        long currentTimeMillis = System.currentTimeMillis();
        do {
            assertSubscriptionStateNotClosed();
            Subscription subscription = this.currentSubscription.get();
            if (subscription != null) {
                subscription.awaitCatchup(j);
                return this;
            }
            sleep();
        } while (System.currentTimeMillis() - currentTimeMillis <= j);
        throw new TimeoutException();
    }

    @Override // org.factcast.core.subscription.Subscription
    public Subscription awaitComplete() throws SubscriptionCancelledException {
        while (true) {
            assertSubscriptionStateNotClosed();
            Subscription subscription = this.currentSubscription.get();
            if (subscription != null) {
                subscription.awaitComplete();
                return this;
            }
            sleep();
        }
    }

    @Override // org.factcast.core.subscription.Subscription
    public Subscription awaitComplete(long j) throws SubscriptionCancelledException, TimeoutException {
        long currentTimeMillis = System.currentTimeMillis();
        do {
            assertSubscriptionStateNotClosed();
            Subscription subscription = this.currentSubscription.get();
            if (subscription != null) {
                subscription.awaitComplete(j);
                return this;
            }
            sleep();
        } while (System.currentTimeMillis() - currentTimeMillis <= j);
        throw new TimeoutException();
    }

    private void assertSubscriptionStateNotClosed() {
        if (this.closed.get()) {
            throw new SubscriptionCancelledException("Subscription already cancelled");
        }
    }

    public ReconnectingFactSubscriptionWrapper(@NonNull FactStore factStore, @NonNull SubscriptionRequestTO subscriptionRequestTO, @NonNull FactObserver factObserver) {
        Objects.requireNonNull(factStore, "store is marked non-null but is null");
        Objects.requireNonNull(subscriptionRequestTO, "req is marked non-null but is null");
        Objects.requireNonNull(factObserver, "obs is marked non-null but is null");
        this.store = factStore;
        this.originalObserver = factObserver;
        this.originalRequest = subscriptionRequestTO;
        this.observer = new FactObserver() { // from class: org.factcast.core.subscription.ReconnectingFactSubscriptionWrapper.2
            @Override // org.factcast.core.subscription.observer.GenericObserver
            public void onNext(@NonNull Fact fact) {
                Objects.requireNonNull(fact, "element is marked non-null but is null");
                ReconnectingFactSubscriptionWrapper.this.originalObserver.onNext(fact);
                ReconnectingFactSubscriptionWrapper.this.factIdSeen.set(fact.id());
            }

            @Override // org.factcast.core.subscription.observer.GenericObserver
            public void onCatchup() {
                ReconnectingFactSubscriptionWrapper.this.originalObserver.onCatchup();
            }

            @Override // org.factcast.core.subscription.observer.GenericObserver
            public void onComplete() {
                ReconnectingFactSubscriptionWrapper.this.originalObserver.onComplete();
            }

            @Override // org.factcast.core.subscription.observer.GenericObserver
            public void onError(@NonNull Throwable th) {
                Objects.requireNonNull(th, "exception is marked non-null but is null");
                if (th.getClass().getCanonicalName().startsWith("org.factcast")) {
                    ReconnectingFactSubscriptionWrapper.this.originalObserver.onError(th);
                    throw ExceptionHelper.toRuntime(th);
                }
                ReconnectingFactSubscriptionWrapper.log.info("Closing & Reconnecting subscription due to onError triggered.", th);
                ReconnectingFactSubscriptionWrapper.this.closeAndDetachSubscription();
                ReconnectingFactSubscriptionWrapper reconnectingFactSubscriptionWrapper = ReconnectingFactSubscriptionWrapper.this;
                CompletableFuture.runAsync(() -> {
                    reconnectingFactSubscriptionWrapper.initiateReconnect();
                }, ReconnectingFactSubscriptionWrapper.this.es);
                ReconnectingFactSubscriptionWrapper.this.originalObserver.onError(th);
            }
        };
        initiateReconnect();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void initiateReconnect() {
        SubscriptionRequestTO forFacts = SubscriptionRequestTO.forFacts(this.originalRequest);
        UUID uuid = this.factIdSeen.get();
        if (uuid != null) {
            forFacts.startingAfter(uuid);
        }
        while (this.currentSubscription.get() == null) {
            try {
                this.currentSubscription.compareAndSet(null, this.store.subscribe(forFacts, this.observer));
                return;
            } catch (Exception e) {
                sleep();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeAndDetachSubscription() {
        try {
            this.currentSubscription.getAndSet(null).close();
        } catch (Exception e) {
            log.warn("Ignoring Exception while closing a subscription:", e);
        }
    }
}
