package no.ks.eventstore2.eventstore;

import akka.ConfigurationException;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.ClusterEvent;
import akka.dispatch.Futures;
import akka.dispatch.OnFailure;
import com.google.common.collect.HashMultimap;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import no.ks.eventstore2.AkkaClusterInfo;
import no.ks.eventstore2.Event;
import no.ks.eventstore2.TakeBackup;
import no.ks.eventstore2.TakeSnapshot;
import no.ks.eventstore2.response.Success;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:no/ks/eventstore2/eventstore/EventStore.class */
public class EventStore extends UntypedActor {
    private static Logger log = LoggerFactory.getLogger(EventStore.class);
    private ActorRef leaderEventStore;
    private AkkaClusterInfo leaderInfo;
    private JournalStorage storage;
    private HashMultimap<String, ActorRef> aggregateSubscribers = HashMultimap.create();
    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
    private Map<String, HashSet<ActorRef>> pendingSubscriptions = new HashMap();

    public static Props mkProps(JournalStorage journalStorage) {
        return Props.create(EventStore.class, new Object[]{journalStorage});
    }

    public EventStore(JournalStorage journalStorage) {
        this.storage = journalStorage;
    }

    public void postStop() {
        this.storage.close();
    }

    public void preStart() {
        this.leaderInfo = new AkkaClusterInfo(getContext().system());
        this.leaderInfo.subscribeToClusterEvents(self());
        updateLeaderState(null);
        log.debug("Eventstore started with adress {}", getSelf().path());
    }

    public void postRestart(Throwable th) throws Exception {
        super.postRestart(th);
        log.warn("Restarted eventstore, restarting storage");
        this.storage.close();
        if (this.leaderInfo.isLeader()) {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
            }
            this.storage.open();
        }
    }

    private void updateLeaderState(ClusterEvent.LeaderChanged leaderChanged) {
        try {
            this.leaderInfo.updateLeaderState(leaderChanged);
            this.leaderEventStore = getContext().actorFor(this.leaderInfo.getLeaderAdress() + "/user/eventstore");
            log.debug("LeaderEventStore is {}", this.leaderEventStore);
            if (!this.leaderInfo.isLeader() && this.leaderInfo.amIUp()) {
                for (String str : this.aggregateSubscribers.keySet()) {
                    this.leaderEventStore.tell(new SubscriptionRefresh(str, this.aggregateSubscribers.get(str)), self());
                }
            }
            if (this.leaderInfo.isLeader()) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
                log.info("opening journal store");
                this.storage.open();
            } else {
                log.info("closing journal store");
                this.storage.close();
            }
        } catch (ConfigurationException e2) {
            log.debug("Not cluster system");
        }
    }

    public void onReceive(Object obj) throws Exception {
        if (!(obj instanceof Subscription)) {
            fillPendingSubscriptions();
        }
        if (obj instanceof ClusterEvent.MemberRemoved) {
            ClusterEvent.MemberRemoved memberRemoved = (ClusterEvent.MemberRemoved) obj;
            log.info("Member removed: {} status {}", memberRemoved.member(), memberRemoved.previousStatus());
            for (String str : this.aggregateSubscribers.keySet()) {
                HashSet hashSet = new HashSet();
                for (ActorRef actorRef : this.aggregateSubscribers.get(str)) {
                    if (actorRef.path().address().equals(memberRemoved.member().address())) {
                        hashSet.add(actorRef);
                        log.debug("removeing actorref {}", actorRef);
                    }
                }
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    ActorRef actorRef2 = (ActorRef) it.next();
                    this.aggregateSubscribers.get(str).remove(actorRef2);
                    log.info("Aggregate {} removeed subscriber {}", str, actorRef2);
                }
            }
        }
        if (obj instanceof ClusterEvent.LeaderChanged) {
            log.info("Recieved LeaderChanged event: {}", obj);
            updateLeaderState((ClusterEvent.LeaderChanged) obj);
            return;
        }
        if (obj instanceof StoreEvents) {
            if (!this.leaderInfo.isLeader()) {
                log.info("Sending to leader {} events {}", sender(), obj);
                this.leaderEventStore.tell(obj, sender());
                return;
            }
            storeEvents((StoreEvents) obj);
            publishEvents((StoreEvents) obj);
            for (Event event : ((StoreEvents) obj).getEvents()) {
                log.info("Published event {}: {}", event, event.getLogMessage());
            }
            return;
        }
        if (obj instanceof Event) {
            if (!this.leaderInfo.isLeader()) {
                log.info("Sending to leader {} event {}", sender(), obj);
                this.leaderEventStore.tell(obj, sender());
                return;
            } else {
                storeEvent((Event) obj);
                publishEvent((Event) obj);
                log.info("Published event {}: {}", obj, ((Event) obj).getLogMessage());
                return;
            }
        }
        if (obj instanceof RetreiveAggregateEvents) {
            if (this.leaderInfo.isLeader()) {
                readAggregateEvents((RetreiveAggregateEvents) obj);
                return;
            } else {
                log.info("Sending to leader {} retrieveAggregateEvents {}", sender(), obj);
                this.leaderEventStore.tell(obj, sender());
                return;
            }
        }
        if (obj instanceof Subscription) {
            Subscription subscription = (Subscription) obj;
            addSubscriber(subscription);
            tryToFillSubscription(sender(), subscription);
            return;
        }
        if (obj instanceof SubscriptionRefresh) {
            SubscriptionRefresh subscriptionRefresh = (SubscriptionRefresh) obj;
            log.info("Refreshing subscription for {}", subscriptionRefresh);
            addSubscriber(subscriptionRefresh);
            return;
        }
        if ("ping".equals(obj)) {
            log.debug("Ping reveiced from {}", sender());
            sender().tell("pong", self());
            return;
        }
        if ("pong".equals(obj)) {
            log.debug("Pong received from {}", sender());
            return;
        }
        if ("startping".equals(obj)) {
            log.debug("starting ping sending to {} from {}", this.leaderEventStore, self());
            if (this.leaderEventStore != null) {
                this.leaderEventStore.tell("ping", self());
                return;
            }
            return;
        }
        if (obj instanceof AcknowledgePreviousEventsProcessed) {
            if (this.leaderInfo.isLeader()) {
                sender().tell(new Success(), self());
                return;
            } else {
                this.leaderEventStore.tell(obj, sender());
                return;
            }
        }
        if ((obj instanceof UpgradeAggregate) && this.leaderInfo.isLeader()) {
            UpgradeAggregate upgradeAggregate = (UpgradeAggregate) obj;
            log.info("Upgrading aggregate " + upgradeAggregate.getAggregateType());
            this.storage.upgradeFromOldStorage(upgradeAggregate.getAggregateType(), upgradeAggregate.getOldStorage());
            log.info("Upgraded aggregate " + upgradeAggregate.getAggregateType());
            return;
        }
        if (obj instanceof TakeBackup) {
            if (!this.leaderInfo.isLeader()) {
                this.leaderEventStore.tell(obj, sender());
                return;
            }
            Iterator it2 = this.aggregateSubscribers.values().iterator();
            while (it2.hasNext()) {
                ((ActorRef) it2.next()).tell(obj, self());
            }
            this.storage.doBackup(((TakeBackup) obj).getBackupdir(), "backupEventStore" + this.format.format(new Date()));
            return;
        }
        if (obj instanceof TakeSnapshot) {
            if (!this.leaderInfo.isLeader()) {
                this.leaderEventStore.tell(obj, sender());
                return;
            }
            Iterator it3 = this.aggregateSubscribers.values().iterator();
            while (it3.hasNext()) {
                ((ActorRef) it3.next()).tell(obj, self());
            }
        }
    }

    private void readAggregateEvents(RetreiveAggregateEvents retreiveAggregateEvents) {
        sender().tell(this.storage.loadEventsForAggregateId(retreiveAggregateEvents.getAggregateType(), retreiveAggregateEvents.getAggregateId(), retreiveAggregateEvents.getFromJournalId()), self());
    }

    private void tryToFillSubscription(final ActorRef actorRef, final Subscription subscription) {
        final ActorRef self = self();
        if (subscription instanceof AsyncSubscription) {
            Futures.future(new Callable<Boolean>() { // from class: no.ks.eventstore2.eventstore.EventStore.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Boolean call() {
                    EventStore.log.info("Got async subscription on {} from {}, filling subscriptions", subscription, actorRef);
                    boolean loadEvents = EventStore.this.loadEvents(actorRef, subscription);
                    if (loadEvents) {
                        EventStore.log.info("Async CompleteAsyncSubscriptionPleaseSendSyncSubscription");
                        actorRef.tell(new CompleteAsyncSubscriptionPleaseSendSyncSubscription(subscription.getAggregateType()), self);
                    } else {
                        EventStore.log.info("Async IncompleteSubscriptionPleaseSendNew");
                        actorRef.tell(new IncompleteSubscriptionPleaseSendNew(subscription.getAggregateType()), self);
                    }
                    return Boolean.valueOf(loadEvents);
                }
            }, getContext().system().dispatcher()).onFailure(new OnFailure() { // from class: no.ks.eventstore2.eventstore.EventStore.2
                public void onFailure(Throwable th) {
                    EventStore.log.error("Error in AsyncSubscribe", th);
                }
            }, getContext().system().dispatcher());
            return;
        }
        log.info("Got subscription on {} from {}, filling subscriptions", subscription, actorRef);
        if (!loadEvents(actorRef, subscription)) {
            log.info("IncompleteSubscriptionPleaseSendNew");
            actorRef.tell(new IncompleteSubscriptionPleaseSendNew(subscription.getAggregateType()), self());
        } else if (!this.leaderInfo.isLeader()) {
            log.info("Sending subscription to leader {} from {}", this.leaderEventStore.path(), sender().path());
            this.leaderEventStore.tell(subscription, actorRef);
        } else {
            log.info("CompleteSubscriptionRegistered");
            actorRef.tell(new CompleteSubscriptionRegistered(subscription.getAggregateType()), self());
            addSubscriber(subscription);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean loadEvents(final ActorRef actorRef, Subscription subscription) {
        return (subscription.getFromJournalId() == null || "".equals(subscription.getFromJournalId().trim())) ? this.storage.loadEventsAndHandle(subscription.getAggregateType(), new HandleEvent() { // from class: no.ks.eventstore2.eventstore.EventStore.3
            @Override // no.ks.eventstore2.eventstore.HandleEvent
            public void handleEvent(Event event) {
                EventStore.this.sendEvent(event, actorRef);
            }
        }) : this.storage.loadEventsAndHandle(subscription.getAggregateType(), new HandleEvent() { // from class: no.ks.eventstore2.eventstore.EventStore.4
            @Override // no.ks.eventstore2.eventstore.HandleEvent
            public void handleEvent(Event event) {
                EventStore.this.sendEvent(event, actorRef);
            }
        }, subscription.getFromJournalId());
    }

    private void fillPendingSubscriptions() {
        if (this.pendingSubscriptions.isEmpty()) {
            return;
        }
        log.info("Filling pending subscriptions {}", this.pendingSubscriptions);
        for (final String str : this.pendingSubscriptions.keySet()) {
            this.storage.loadEventsAndHandle(str, new HandleEvent() { // from class: no.ks.eventstore2.eventstore.EventStore.5
                @Override // no.ks.eventstore2.eventstore.HandleEvent
                public void handleEvent(Event event) {
                    EventStore.this.sendEvent(event, (Set<ActorRef>) EventStore.this.pendingSubscriptions.get(str));
                }
            });
        }
        this.pendingSubscriptions.clear();
        log.info("Filled pending subscriptions");
    }

    private void addPendingSubscription(ActorRef actorRef, String str) {
        if (this.pendingSubscriptions.get(str) == null) {
            this.pendingSubscriptions.put(str, new HashSet<>());
        }
        this.pendingSubscriptions.get(str).add(actorRef);
        getContext().system().scheduler().scheduleOnce(Duration.create(250L, TimeUnit.MILLISECONDS), self(), "FillPendingSubscriptions", getContext().system().dispatcher(), self());
    }

    private void addSubscriber(SubscriptionRefresh subscriptionRefresh) {
        this.aggregateSubscribers.putAll(subscriptionRefresh.getAggregateType(), subscriptionRefresh.getSubscribers());
    }

    private void publishEvents(StoreEvents storeEvents) {
        Iterator<? extends Event> it = storeEvents.getEvents().iterator();
        while (it.hasNext()) {
            publishEvent(it.next());
        }
    }

    private void publishEvent(Event event) {
        Set<ActorRef> set = this.aggregateSubscribers.get(event.getAggregateType());
        if (set == null) {
            return;
        }
        sendEvent(event, set);
    }

    private void addSubscriber(Subscription subscription) {
        this.aggregateSubscribers.put(subscription.getAggregateType(), sender());
    }

    public void storeEvent(Event event) {
        event.setCreated(new DateTime());
        this.storage.saveEvent(event);
    }

    private void storeEvents(StoreEvents storeEvents) {
        Iterator<? extends Event> it = storeEvents.getEvents().iterator();
        while (it.hasNext()) {
            it.next().setCreated(new DateTime());
        }
        this.storage.saveEvents(storeEvents.getEvents());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEvent(Event event, ActorRef actorRef) {
        Event upgradeEvent = upgradeEvent(event);
        log.debug("Publishing event {} to {}", upgradeEvent, actorRef);
        actorRef.tell(upgradeEvent, self());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEvent(Event event, Set<ActorRef> set) {
        Event upgradeEvent = upgradeEvent(event);
        for (ActorRef actorRef : set) {
            log.debug("Publishing event {} to {}", upgradeEvent, actorRef);
            actorRef.tell(upgradeEvent, self());
        }
    }

    private Event upgradeEvent(Event event) {
        Event event2 = event;
        Event upgrade = event2.upgrade();
        while (true) {
            Event event3 = upgrade;
            if (event3 == event2) {
                return event3;
            }
            event2 = event3;
            upgrade = event2.upgrade();
        }
    }
}
