/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.test;

import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.ThrowingConsumer;
import io.fluxcapacitor.common.ThrowingFunction;
import io.fluxcapacitor.common.api.Data;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.application.PropertySource;
import io.fluxcapacitor.common.application.SimplePropertySource;
import io.fluxcapacitor.common.handling.HandlerFilter;
import io.fluxcapacitor.common.handling.HandlerInvoker;
import io.fluxcapacitor.common.reflection.ReflectionUtils;
import io.fluxcapacitor.common.serialization.JsonUtils;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.ClientUtils;
import io.fluxcapacitor.javaclient.common.IdentityProvider;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.configuration.DefaultFluxCapacitor;
import io.fluxcapacitor.javaclient.configuration.FluxCapacitorBuilder;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.configuration.client.InMemoryClient;
import io.fluxcapacitor.javaclient.persisting.search.Search;
import io.fluxcapacitor.javaclient.publishing.DispatchInterceptor;
import io.fluxcapacitor.javaclient.publishing.client.MessageDispatch;
import io.fluxcapacitor.javaclient.scheduling.DefaultScheduler;
import io.fluxcapacitor.javaclient.scheduling.Schedule;
import io.fluxcapacitor.javaclient.scheduling.ScheduledCommandHandler;
import io.fluxcapacitor.javaclient.scheduling.Scheduler;
import io.fluxcapacitor.javaclient.scheduling.client.InMemorySchedulingClient;
import io.fluxcapacitor.javaclient.scheduling.client.SchedulingClient;
import io.fluxcapacitor.javaclient.test.Given;
import io.fluxcapacitor.javaclient.test.PredictableIdFactory;
import io.fluxcapacitor.javaclient.test.ResultValidator;
import io.fluxcapacitor.javaclient.test.TestClient;
import io.fluxcapacitor.javaclient.test.TestFluxCapacitor;
import io.fluxcapacitor.javaclient.test.TestUserProvider;
import io.fluxcapacitor.javaclient.test.Then;
import io.fluxcapacitor.javaclient.test.When;
import io.fluxcapacitor.javaclient.tracking.BatchInterceptor;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.ErrorHandler;
import io.fluxcapacitor.javaclient.tracking.Tracker;
import io.fluxcapacitor.javaclient.tracking.handling.HandleSelf;
import io.fluxcapacitor.javaclient.tracking.handling.HandlerInterceptor;
import io.fluxcapacitor.javaclient.tracking.handling.authentication.User;
import io.fluxcapacitor.javaclient.tracking.handling.authentication.UserProvider;
import io.fluxcapacitor.javaclient.web.WebRequest;
import java.beans.ConstructorProperties;
import java.lang.reflect.Executable;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestFixture
implements Given,
When {
    private static final Logger log = LoggerFactory.getLogger(TestFixture.class);
    public static Duration defaultResultTimeout = Duration.ofSeconds(10L);
    public static Duration defaultConsumerTimeout = Duration.ofSeconds(30L);
    private final FluxCapacitor fluxCapacitor;
    private final FluxCapacitorBuilder fluxCapacitorBuilder;
    private Duration resultTimeout = defaultResultTimeout;
    private Duration consumerTimeout = defaultConsumerTimeout;
    private final boolean synchronous;
    private Registration registration = Registration.noOp();
    private volatile Message tracedMessage;
    private final Map<ActiveConsumer, List<Message>> consumers = new ConcurrentHashMap<ActiveConsumer, List<Message>>();
    private final List<Message> commands = new CopyOnWriteArrayList<Message>();
    private final List<Message> events = new CopyOnWriteArrayList<Message>();
    private final List<Message> webRequests = new CopyOnWriteArrayList<Message>();
    private final List<Message> webResponses = new CopyOnWriteArrayList<Message>();
    private final List<Message> metrics = new CopyOnWriteArrayList<Message>();
    private final List<Schedule> schedules = new CopyOnWriteArrayList<Schedule>();
    private final CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList();
    private final Map<String, String> testProperties = new HashMap<String, String>();
    private volatile boolean collectingResults;
    private final List<ThrowingConsumer<TestFixture>> modifiers = new CopyOnWriteArrayList<ThrowingConsumer<TestFixture>>();
    private static final ThreadLocal<List<TestFixture>> activeFixtures = ThreadLocal.withInitial(ArrayList::new);
    private static final Executor shutdownExecutor = Executors.newFixedThreadPool(16);

    public static TestFixture create(Object ... handlers) {
        return TestFixture.create((FluxCapacitorBuilder)DefaultFluxCapacitor.builder(), handlers);
    }

    public static TestFixture create(FluxCapacitorBuilder fluxCapacitorBuilder, Object ... handlers) {
        return TestFixture.create(fluxCapacitorBuilder, (FluxCapacitor fc) -> Arrays.asList(handlers));
    }

    public static TestFixture create(Function<FluxCapacitor, List<?>> handlersFactory) {
        return TestFixture.create((FluxCapacitorBuilder)DefaultFluxCapacitor.builder(), handlersFactory);
    }

    public static TestFixture create(FluxCapacitorBuilder fluxCapacitorBuilder, Function<FluxCapacitor, List<?>> handlersFactory) {
        return new TestFixture(fluxCapacitorBuilder, handlersFactory, (Client)InMemoryClient.newInstance(null), true);
    }

    public static TestFixture createAsync(Object ... handlers) {
        return TestFixture.createAsync((FluxCapacitorBuilder)DefaultFluxCapacitor.builder(), handlers);
    }

    public static TestFixture createAsync(FluxCapacitorBuilder fluxCapacitorBuilder, Object ... handlers) {
        return TestFixture.createAsync(fluxCapacitorBuilder, (FluxCapacitor fc) -> Arrays.asList(handlers));
    }

    public static TestFixture createAsync(Function<FluxCapacitor, List<?>> handlersFactory) {
        return TestFixture.createAsync((FluxCapacitorBuilder)DefaultFluxCapacitor.builder(), handlersFactory);
    }

    public static TestFixture createAsync(FluxCapacitorBuilder fluxCapacitorBuilder, Function<FluxCapacitor, List<?>> handlersFactory) {
        return new TestFixture(fluxCapacitorBuilder, handlersFactory, (Client)InMemoryClient.newInstance(null), false);
    }

    public static TestFixture createAsync(FluxCapacitorBuilder fluxCapacitorBuilder, Client client, Object ... handlers) {
        return new TestFixture(fluxCapacitorBuilder, fc -> Arrays.asList(handlers), client, false);
    }

    public static void shutDownActiveFixtures() {
        List<TestFixture> fixtures = activeFixtures.get();
        if (!fixtures.isEmpty()) {
            activeFixtures.remove();
            fixtures.forEach(fixture -> shutdownExecutor.execute(() -> {
                Optional.ofNullable(fixture.registration).ifPresent(Registration::cancel);
                fixture.fluxCapacitor.client().shutDown();
            }));
        }
    }

    protected TestFixture(FluxCapacitorBuilder fluxCapacitorBuilder, Function<FluxCapacitor, List<?>> handlerFactory, Client client, boolean synchronous) {
        activeFixtures.get().add(this);
        this.synchronous = synchronous;
        Optional<TestUserProvider> userProvider = Optional.ofNullable(UserProvider.defaultUserSupplier).map(TestUserProvider::new);
        if (userProvider.isPresent()) {
            fluxCapacitorBuilder = fluxCapacitorBuilder.registerUserProvider((UserProvider)userProvider.get());
        }
        if (synchronous) {
            fluxCapacitorBuilder.disableScheduledCommandHandler();
        }
        fluxCapacitorBuilder.addPropertySource((PropertySource)new SimplePropertySource(this.testProperties));
        GivenWhenThenInterceptor interceptor = new GivenWhenThenInterceptor();
        Arrays.stream(MessageType.values()).forEach(type -> client.getGatewayClient(type).registerMonitor((Consumer)interceptor));
        this.fluxCapacitorBuilder = fluxCapacitorBuilder = fluxCapacitorBuilder.disableShutdownHook().addDispatchInterceptor((DispatchInterceptor)interceptor, new MessageType[0]).replaceIdentityProvider(p -> p == IdentityProvider.defaultIdentityProvider ? new PredictableIdFactory() : p).addBatchInterceptor((BatchInterceptor)interceptor, new MessageType[0]).addHandlerInterceptor((HandlerInterceptor)interceptor, true, new MessageType[0]);
        this.fluxCapacitor = new TestFluxCapacitor(fluxCapacitorBuilder.build((Client)new TestClient(client)));
        this.withClock(Clock.fixed(Instant.now(), ZoneId.systemDefault()));
        ArrayList<ScheduledCommandHandler> handlers = new ArrayList<ScheduledCommandHandler>();
        if (synchronous) {
            handlers.add(new ScheduledCommandHandler());
        }
        handlers.addAll((Collection)handlerFactory.apply(this.fluxCapacitor));
        this.registerHandlers(handlers);
    }

    protected TestFixture(TestFixture currentFixture, boolean synchronous) {
        activeFixtures.get().add(this);
        this.synchronous = synchronous;
        this.fluxCapacitorBuilder = currentFixture.fluxCapacitorBuilder;
        Client currentClient = ((TestClient)currentFixture.fluxCapacitor.client()).getDelegate();
        Client newClient = currentClient instanceof InMemoryClient ? InMemoryClient.newInstance(null) : currentClient;
        this.fluxCapacitor = new TestFluxCapacitor(this.fluxCapacitorBuilder.build((Client)new TestClient(newClient)));
    }

    public TestFixture resultTimeout(Duration resultTimeout) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> {
            fixture.resultTimeout = resultTimeout;
        }));
    }

    public TestFixture consumerTimeout(Duration consumerTimeout) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> {
            fixture.consumerTimeout = consumerTimeout;
        }));
    }

    public TestFixture async() {
        if (this.synchronous) {
            TestFixture result = new TestFixture(this, false);
            this.modifiers.forEach(result::modifyFixture);
            return result;
        }
        return this;
    }

    public TestFixture sync() {
        if (!this.synchronous) {
            TestFixture result = new TestFixture(this, true);
            this.modifiers.forEach(result::modifyFixture);
            return result;
        }
        return this;
    }

    public TestFixture registerHandlers(List<?> handlers) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> {
            FluxCapacitor fc = fixture.getFluxCapacitor();
            if (handlers.isEmpty()) {
                return;
            }
            handlers.stream().collect(Collectors.toMap(Object::getClass, Function.identity(), (a, b) -> {
                log.warn("Handler of type {} is registered more than once. Please make sure this is intentional.", a.getClass());
                return a;
            }));
            if (!fixture.synchronous) {
                fixture.registration = fixture.registration.merge(fc.registerHandlers(handlers));
                return;
            }
            HandlerFilter handlerFilter = (c, e) -> true;
            Registration registration = (Registration)fc.apply(f -> handlers.stream().flatMap(h -> Stream.of(fc.commandGateway().registerHandler(h, handlerFilter), fc.queryGateway().registerHandler(h, handlerFilter), fc.eventGateway().registerHandler(h, handlerFilter), fc.eventStore().registerHandler(h, handlerFilter), fc.errorGateway().registerHandler(h, handlerFilter), fc.webRequestGateway().registerHandler(h, handlerFilter), fc.metricsGateway().registerHandler(h, handlerFilter))).reduce(Registration::merge).orElse(Registration.noOp()));
            Scheduler patt15215$temp = fc.scheduler();
            if (patt15215$temp instanceof DefaultScheduler) {
                DefaultScheduler scheduler = (DefaultScheduler)patt15215$temp;
                registration.merge(handlers.stream().flatMap(h -> Stream.of(scheduler.registerHandler(h, handlerFilter))).reduce(Registration::merge).orElse(Registration.noOp()));
            } else {
                log.warn("Could not register local schedule handlers");
            }
            fixture.registration = fixture.registration.merge(registration);
        }));
    }

    public TestFixture registerHandlers(Object ... handlers) {
        return this.registerHandlers(Arrays.asList(handlers));
    }

    @Override
    public TestFixture withClock(Clock clock) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.setClock(clock)));
    }

    @Override
    public TestFixture atFixedTime(Instant time) {
        return this.withClock(Clock.fixed(time, ZoneId.systemDefault()));
    }

    @Override
    public TestFixture withProperty(String name, Object value) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.testProperties.compute(name, (k, v) -> value == null ? null : value.toString())));
    }

    protected TestFixture modifyFixture(ThrowingConsumer<TestFixture> modifier) {
        this.modifiers.add(modifier);
        return (TestFixture)this.fluxCapacitor.apply(fc -> {
            modifier.accept((Object)this);
            return this;
        });
    }

    @Override
    public TestFixture givenCommands(Object ... commands) {
        Class callerClass = ReflectionUtils.getCallerClass();
        this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.asMessages(callerClass, commands).forEach(c -> fixture.getDispatchResult(fixture.getFluxCapacitor().commandGateway().send(c)))));
        return this;
    }

    @Override
    public TestFixture givenCommandsByUser(User user, Object ... commands) {
        Class callerClass = ReflectionUtils.getCallerClass();
        this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.asMessages(callerClass, commands).map(c -> fixture.addUser(user, c)).forEach(c -> fixture.getDispatchResult(fixture.getFluxCapacitor().commandGateway().send(c)))));
        return this;
    }

    @Override
    public TestFixture givenAppliedEvents(String aggregateId, Class<?> aggregateClass, Object ... events) {
        Class callerClass = ReflectionUtils.getCallerClass();
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.applyEvents(aggregateId, aggregateClass, fixture.getFluxCapacitor(), fixture.asMessages(callerClass, events).toList())));
    }

    @Override
    public TestFixture givenEvents(Object ... events) {
        Class callerClass = ReflectionUtils.getCallerClass();
        this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.asMessages(callerClass, events).forEach(e -> fixture.getFluxCapacitor().eventGateway().publish(e))));
        return this;
    }

    @Override
    public TestFixture givenDocument(Object document, String id, String collection, Instant timestamp, Instant end) {
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.getFluxCapacitor().documentStore().index(document, (Object)id, (Object)collection, timestamp, end).get()));
    }

    @Override
    public TestFixture givenDocuments(String collection, Object ... documents) {
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.getFluxCapacitor().documentStore().index(List.of(documents), (Object)collection).get()));
    }

    @Override
    public TestFixture givenWebRequest(WebRequest webRequest) {
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.getDispatchResult(fixture.getFluxCapacitor().webRequestGateway().send(webRequest))));
    }

    @Override
    public TestFixture givenTimeAdvancedTo(Instant instant) {
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.advanceTimeTo(instant)));
    }

    @Override
    public TestFixture givenElapsedTime(Duration duration) {
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> fixture.advanceTimeBy(duration)));
    }

    @Override
    public TestFixture given(ThrowingConsumer<FluxCapacitor> condition) {
        return this.givenModification((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> condition.accept((Object)fixture.getFluxCapacitor())));
    }

    protected TestFixture givenModification(ThrowingConsumer<TestFixture> modifier) {
        return this.modifyFixture((ThrowingConsumer<TestFixture>)((ThrowingConsumer)fixture -> {
            try {
                fixture.handleExpiredSchedulesLocally();
                modifier.accept(fixture);
                fixture.handleExpiredSchedulesLocally();
                fixture.waitForConsumers();
            }
            catch (Throwable e) {
                throw new IllegalStateException("Failed to execute given", e);
            }
        }));
    }

    @Override
    public Then whenCommand(Object command) {
        Message message = this.trace(command);
        return this.whenApplying(fc -> this.getDispatchResult(fc.commandGateway().send((Object)message)));
    }

    @Override
    public Then whenCommandByUser(Object command, User user) {
        return this.whenCommand(this.addUser(user, command));
    }

    @Override
    public Then whenQuery(Object query) {
        Message message = this.trace(query);
        return this.whenApplying(fc -> this.getDispatchResult(fc.queryGateway().send((Object)message)));
    }

    @Override
    public Then whenQueryByUser(Object query, User user) {
        return this.whenQuery(this.addUser(user, query));
    }

    @Override
    public Then whenEvent(Object event) {
        Message message = this.trace(event);
        return this.whenExecuting((ThrowingConsumer<FluxCapacitor>)((ThrowingConsumer)fc -> ClientUtils.runSilently(() -> fc.eventGateway().publish(message, Guarantee.STORED).get())));
    }

    @Override
    public Then whenEventsAreApplied(String aggregateId, Class<?> aggregateClass, Object ... events) {
        Class callerClass = ReflectionUtils.getCallerClass();
        return this.whenExecuting((ThrowingConsumer<FluxCapacitor>)((ThrowingConsumer)fc -> this.applyEvents(aggregateId, aggregateClass, (FluxCapacitor)fc, this.asMessages(callerClass, events).collect(Collectors.toList()))));
    }

    @Override
    public Then whenSearching(String collection, UnaryOperator<Search> searchQuery) {
        return this.whenApplying(fc -> ((Search)searchQuery.apply(fc.documentStore().search((Object)collection, new Object[0]))).fetchAll());
    }

    @Override
    public Then whenWebRequest(WebRequest request) {
        return this.whenApplying(fc -> request.getMethod().isWebsocket() ? fc.webRequestGateway().sendAndForget(Guarantee.STORED, new WebRequest[]{(WebRequest)this.trace(request)}) : this.getDispatchResult(fc.webRequestGateway().send((WebRequest)this.trace(request))));
    }

    @Override
    public Then whenScheduleExpires(Object schedule) {
        Message message = this.trace(schedule);
        return this.whenExecuting((ThrowingConsumer<FluxCapacitor>)((ThrowingConsumer)fc -> fc.scheduler().schedule((Object)message, this.getCurrentTime())));
    }

    @Override
    public Then whenTimeElapses(Duration duration) {
        return this.whenExecuting((ThrowingConsumer<FluxCapacitor>)((ThrowingConsumer)fc -> this.advanceTimeBy(duration)));
    }

    @Override
    public Then whenTimeAdvancesTo(Instant instant) {
        return this.whenExecuting((ThrowingConsumer<FluxCapacitor>)((ThrowingConsumer)fc -> this.advanceTimeTo(instant)));
    }

    @Override
    public Then whenApplying(ThrowingFunction<FluxCapacitor, ?> action) {
        return (Then)this.fluxCapacitor.apply(fc -> {
            try {
                Object result;
                this.handleExpiredSchedulesLocally();
                this.waitForConsumers();
                this.resetMocks();
                this.collectingResults = true;
                try {
                    result = action.apply(fc);
                    if (result instanceof CompletableFuture) {
                        CompletableFuture future = (CompletableFuture)result;
                        result = this.getDispatchResult(future);
                    }
                }
                catch (Throwable e) {
                    this.registerError(e);
                    result = e;
                }
                this.waitForConsumers();
                Then then = this.getResultValidator(result, this.commands, this.events, this.schedules, this.getFutureSchedules(), this.errors, this.metrics);
                return then;
            }
            finally {
                this.handleExpiredSchedulesLocally();
            }
        });
    }

    protected Then getResultValidator(Object result, List<Message> commands, List<Message> events, List<Schedule> schedules, List<Schedule> allSchedules, List<Throwable> errors, List<Message> metrics) {
        return new ResultValidator(this.getFluxCapacitor(), result, events, commands, this.webRequests, this.webResponses, metrics, schedules, allSchedules.stream().filter(s -> s.getDeadline().isAfter(this.getCurrentTime())).collect(Collectors.toList()), errors);
    }

    protected void applyEvents(String aggregateId, Class<?> aggregateClass, FluxCapacitor fc, List<Message> events) {
        fc.aggregateRepository().load((Object)aggregateId, aggregateClass).apply(events.stream().map(e -> e.withMetadata(e.getMetadata().with(new Object[]{"$aggregateId", aggregateId, "$aggregateType", aggregateClass.getName()}))).toList());
    }

    protected void handleExpiredSchedulesLocally() {
        SchedulingClient schedulingClient;
        if (this.synchronous && (schedulingClient = this.getFluxCapacitor().client().getSchedulingClient()) instanceof InMemorySchedulingClient) {
            List expiredSchedules = ((InMemorySchedulingClient)schedulingClient).removeExpiredSchedules(this.getFluxCapacitor().serializer());
            Scheduler scheduler = this.getFluxCapacitor().scheduler();
            if (scheduler instanceof DefaultScheduler) {
                DefaultScheduler scheduler2 = (DefaultScheduler)scheduler;
                expiredSchedules.forEach(arg_0 -> ((DefaultScheduler)scheduler2).handleLocally(arg_0));
            }
        }
    }

    protected List<Schedule> getFutureSchedules() {
        SchedulingClient schedulingClient = this.getFluxCapacitor().client().getSchedulingClient();
        if (schedulingClient instanceof InMemorySchedulingClient) {
            return ((InMemorySchedulingClient)schedulingClient).getSchedules(this.getFluxCapacitor().serializer()).stream().filter(s -> s.getDeadline().isAfter(this.getCurrentTime())).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForConsumers() {
        if (this.synchronous) {
            return;
        }
        Map<ActiveConsumer, List<Message>> map = this.consumers;
        synchronized (map) {
            if (!this.checkConsumers()) {
                try {
                    this.consumers.wait(this.consumerTimeout.toMillis());
                }
                catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
            }
            if (!this.checkConsumers()) {
                log.warn("Some consumers in the test fixture did not finish processing all messages. This may cause your test to fail. Waiting consumers: {}", this.consumers.entrySet().stream().filter(e -> !((List)e.getValue()).isEmpty()).map(e -> ((ActiveConsumer)e.getKey()).getName() + " : " + ((List)e.getValue()).stream().map(m -> m.getPayload() == null ? "Void" : m.getPayload().getClass().getSimpleName()).collect(Collectors.joining(", "))).collect(Collectors.toList()));
            }
        }
    }

    protected void resetMocks() {
        ((TestClient)this.fluxCapacitor.client()).resetMocks();
        ((TestFluxCapacitor)this.fluxCapacitor).resetMocks();
    }

    protected void advanceTimeBy(Duration duration) {
        this.advanceTimeTo(this.getCurrentTime().plus(duration));
    }

    protected void advanceTimeTo(Instant instant) {
        this.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
    }

    protected void setClock(Clock clock) {
        this.getFluxCapacitor().withClock(clock);
        SchedulingClient schedulingClient = this.getFluxCapacitor().client().getSchedulingClient();
        if (schedulingClient instanceof InMemorySchedulingClient) {
            ((InMemorySchedulingClient)schedulingClient).setClock(clock);
        } else {
            log.warn("Could not update clock of scheduling client. Timing tests may not work.");
        }
    }

    protected void registerCommand(Message command) {
        this.commands.add(command);
    }

    protected void registerMetric(Message metric) {
        this.metrics.add(metric);
    }

    protected void registerEvent(Message event) {
        this.events.add(event);
    }

    protected void registerWebRequest(Message request) {
        this.webRequests.add(request);
    }

    protected void registerWebResponse(Message response) {
        this.webResponses.add(response);
    }

    protected void registerSchedule(Schedule schedule) {
        this.schedules.add(schedule);
    }

    protected void registerError(Throwable e) {
        this.errors.addIfAbsent(e);
    }

    protected Object getDispatchResult(CompletableFuture<?> dispatchResult) {
        try {
            return this.synchronous ? dispatchResult.get(0L, TimeUnit.MILLISECONDS) : dispatchResult.get(this.resultTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw e.getCause();
        }
        catch (TimeoutException e) {
            throw new TimeoutException("Test fixture did not receive a dispatch result in time. Perhaps some messages did not have handlers?");
        }
    }

    protected Stream<Message> asMessages(Class<?> callerClass, Object ... messages) {
        return Arrays.stream(messages).flatMap(c -> {
            if (c == null) {
                return Stream.empty();
            }
            if (c instanceof Collection) {
                return ((Collection)c).stream();
            }
            if (c.getClass().isArray()) {
                return Arrays.stream((Object[])c);
            }
            return Stream.of(c);
        }).flatMap(c -> {
            Object parsed = this.parsePayload(c, callerClass);
            return parsed == null ? Stream.empty() : (parsed instanceof Collection ? ((Collection)parsed).stream() : (parsed.getClass().isArray() ? Arrays.stream((Object[])parsed) : Stream.of(parsed)));
        }).map(Message::asMessage);
    }

    protected Message trace(Object object) {
        Class callerClass = ReflectionUtils.getCallerClass();
        this.tracedMessage = (Message)this.fluxCapacitor.apply(fc -> Message.asMessage((Object)this.parsePayload(object, callerClass)));
        return this.tracedMessage;
    }

    public Message addUser(User user, Object value) {
        Class callerClass = ReflectionUtils.getCallerClass();
        return (Message)this.fluxCapacitor.apply(fc -> Message.asMessage((Object)this.parsePayload(value, callerClass)).addUser(user));
    }

    public Object parsePayload(Object object, Class<?> callerClass) {
        if ((object = TestFixture.parseObject(object, callerClass)) instanceof Data) {
            Data data = (Data)object;
            Data eventBytes = this.fluxCapacitor.serializer().serialize((Object)data);
            object = this.fluxCapacitor.serializer().deserialize(eventBytes);
        }
        return object;
    }

    public static Object parseObject(Object object, Class<?> callerClass) {
        if (object instanceof String && ((String)object).endsWith(".json")) {
            object = JsonUtils.fromFile(callerClass, (String)((String)object));
        }
        return object;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean checkConsumers() {
        if (this.synchronous) {
            return true;
        }
        Map<ActiveConsumer, List<Message>> map = this.consumers;
        synchronized (map) {
            if (this.consumers.values().stream().allMatch(l -> l.stream().allMatch(m -> m instanceof Schedule && ((Schedule)m).getDeadline().isAfter(this.getCurrentTime())))) {
                this.consumers.notifyAll();
                return true;
            }
        }
        return false;
    }

    @Override
    public FluxCapacitor getFluxCapacitor() {
        return this.fluxCapacitor;
    }

    protected class GivenWhenThenInterceptor
    implements DispatchInterceptor,
    BatchInterceptor,
    HandlerInterceptor,
    Consumer<MessageDispatch> {
        private final List<Schedule> publishedSchedules = new CopyOnWriteArrayList<Schedule>();
        private final Set<String> interceptedMessageIds = new CopyOnWriteArraySet<String>();

        protected GivenWhenThenInterceptor() {
        }

        @Override
        public void accept(MessageDispatch messageDispatch) {
            if (TestFixture.this.collectingResults) {
                try {
                    TestFixture.this.fluxCapacitor.serializer().deserializeMessages(messageDispatch.getMessages().stream().filter(m -> !this.interceptedMessageIds.contains(m.getMessageId())), messageDispatch.getMessageType()).map(DeserializingMessage::toMessage).forEach(m -> this.interceptDispatch((Message)m, messageDispatch.getMessageType()));
                }
                catch (Exception ignored) {
                    log.warn("Failed to intercept a published message. This may cause your test to fail.");
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Message interceptDispatch(Message message, MessageType messageType) {
            if (TestFixture.this.collectingResults) {
                this.interceptedMessageIds.add(message.getMessageId());
            }
            if (messageType == MessageType.SCHEDULE) {
                this.addMessage(this.publishedSchedules, (Schedule)message);
            }
            if (ClientUtils.getHandleSelfAnnotation((Class)message.getPayloadClass()).map(HandleSelf::logMessage).orElse(true).booleanValue()) {
                Map<ActiveConsumer, List<Message>> map = TestFixture.this.consumers;
                synchronized (map) {
                    TestFixture.this.consumers.entrySet().stream().filter(t -> {
                        ActiveConsumer configuration = (ActiveConsumer)t.getKey();
                        return configuration.getMessageType() == messageType && Optional.ofNullable(configuration.getTypeFilter()).map(f -> message.getPayload().getClass().getName().matches((String)f)).orElse(true) != false;
                    }).forEach(e -> this.addMessage((List)e.getValue(), message));
                }
            }
            if (this.captureMessage(message).booleanValue()) {
                switch (messageType) {
                    case COMMAND: {
                        TestFixture.this.registerCommand(message);
                        break;
                    }
                    case EVENT: {
                        TestFixture.this.registerEvent(message);
                        break;
                    }
                    case SCHEDULE: {
                        TestFixture.this.registerSchedule((Schedule)message);
                        break;
                    }
                    case WEBREQUEST: {
                        TestFixture.this.registerWebRequest(message);
                        break;
                    }
                    case WEBRESPONSE: {
                        TestFixture.this.registerWebResponse(message);
                        break;
                    }
                    case METRICS: {
                        TestFixture.this.registerMetric(message);
                    }
                }
            }
            return message;
        }

        protected Boolean captureMessage(Message message) {
            return TestFixture.this.collectingResults && Optional.ofNullable(TestFixture.this.tracedMessage).map(t -> !Objects.equals(t.getMessageId(), message.getMessageId())).orElse(true) != false;
        }

        protected <T extends Message> void addMessage(List<T> messages, T message) {
            if (message instanceof Schedule) {
                messages.removeIf(m -> m instanceof Schedule && ((Schedule)m).getScheduleId().equals(((Schedule)message).getScheduleId()));
            }
            messages.add(message);
        }

        public Consumer<MessageBatch> intercept(Consumer<MessageBatch> consumer, Tracker tracker) {
            List messages = TestFixture.this.consumers.computeIfAbsent(new ActiveConsumer(tracker.getConfiguration(), tracker.getMessageType()), c -> (c.getMessageType() == MessageType.SCHEDULE ? this.publishedSchedules : Collections.emptyList()).stream().filter(m -> Optional.ofNullable(c.getTypeFilter()).map(f -> m.getPayload().getClass().getName().matches((String)f)).orElse(true)).collect(Collectors.toCollection(CopyOnWriteArrayList::new)));
            return b -> {
                consumer.accept((MessageBatch)b);
                Collection messageIds = b.getMessages().stream().map(SerializedMessage::getMessageId).collect(Collectors.toSet());
                messages.removeIf(m -> messageIds.contains(m.getMessageId()));
                TestFixture.this.checkConsumers();
            };
        }

        public Function<DeserializingMessage, Object> interceptHandling(Function<DeserializingMessage, Object> function, HandlerInvoker invoker) {
            return m -> {
                try {
                    Object r = function.apply((DeserializingMessage)m);
                    return r;
                }
                catch (Exception e2) {
                    TestFixture.this.registerError(e2);
                    throw e2;
                }
                finally {
                    if (m.getMessageType().isRequest() && ClientUtils.getLocalHandlerAnnotation(invoker.getTarget().getClass(), (Executable)invoker.getMethod()).map(l -> !l.logMessage()).orElse(false).booleanValue()) {
                        Map<ActiveConsumer, List<Message>> map = TestFixture.this.consumers;
                        synchronized (map) {
                            TestFixture.this.consumers.entrySet().stream().filter(t -> ((ActiveConsumer)t.getKey()).getMessageType() == m.getMessageType()).forEach(e -> ((List)e.getValue()).removeIf(m2 -> m2.getMessageId().equals(m.getMessageId())));
                        }
                        TestFixture.this.checkConsumers();
                    }
                }
            };
        }

        public void shutdown(Tracker tracker) {
            TestFixture.this.consumers.remove(new ActiveConsumer(tracker.getConfiguration(), tracker.getMessageType()));
            TestFixture.this.checkConsumers();
        }
    }

    protected static final class ActiveConsumer {
        private final ConsumerConfiguration configuration;
        private final MessageType messageType;

        @ConstructorProperties(value={"configuration", "messageType"})
        public ActiveConsumer(ConsumerConfiguration configuration, MessageType messageType) {
            this.configuration = configuration;
            this.messageType = messageType;
        }

        public ConsumerConfiguration getConfiguration() {
            return this.configuration;
        }

        public MessageType getMessageType() {
            return this.messageType;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ActiveConsumer)) {
                return false;
            }
            ActiveConsumer other = (ActiveConsumer)o;
            ConsumerConfiguration this$configuration = this.getConfiguration();
            ConsumerConfiguration other$configuration = other.getConfiguration();
            if (this$configuration == null ? other$configuration != null : !this$configuration.equals(other$configuration)) {
                return false;
            }
            MessageType this$messageType = this.getMessageType();
            MessageType other$messageType = other.getMessageType();
            return !(this$messageType == null ? other$messageType != null : !this$messageType.equals(other$messageType));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            ConsumerConfiguration $configuration = this.getConfiguration();
            result = result * 59 + ($configuration == null ? 43 : $configuration.hashCode());
            MessageType $messageType = this.getMessageType();
            result = result * 59 + ($messageType == null ? 43 : $messageType.hashCode());
            return result;
        }

        public String toString() {
            return "TestFixture.ActiveConsumer(configuration=" + this.getConfiguration() + ", messageType=" + this.getMessageType() + ")";
        }

        public ConsumerConfiguration.Builder toBuilder() {
            return this.getConfiguration().toBuilder();
        }

        public String getName() {
            return this.getConfiguration().getName();
        }

        public Predicate<Object> getHandlerFilter() {
            return this.getConfiguration().getHandlerFilter();
        }

        public ErrorHandler getErrorHandler() {
            return this.getConfiguration().getErrorHandler();
        }

        public int getThreads() {
            return this.getConfiguration().getThreads();
        }

        public String getTypeFilter() {
            return this.getConfiguration().getTypeFilter();
        }

        public int getMaxFetchSize() {
            return this.getConfiguration().getMaxFetchSize();
        }

        public Duration getMaxWaitDuration() {
            return this.getConfiguration().getMaxWaitDuration();
        }

        public List<BatchInterceptor> getBatchInterceptors() {
            return this.getConfiguration().getBatchInterceptors();
        }

        public List<HandlerInterceptor> getHandlerInterceptors() {
            return this.getConfiguration().getHandlerInterceptors();
        }

        public boolean filterMessageTarget() {
            return this.getConfiguration().filterMessageTarget();
        }

        public boolean ignoreSegment() {
            return this.getConfiguration().ignoreSegment();
        }

        public boolean singleTracker() {
            return this.getConfiguration().singleTracker();
        }

        public boolean clientControlledIndex() {
            return this.getConfiguration().clientControlledIndex();
        }

        public Long getMinIndex() {
            return this.getConfiguration().getMinIndex();
        }

        public Long getMaxIndexExclusive() {
            return this.getConfiguration().getMaxIndexExclusive();
        }

        public boolean exclusive() {
            return this.getConfiguration().exclusive();
        }

        public boolean passive() {
            return this.getConfiguration().passive();
        }

        public Function<Client, String> getTrackerIdFactory() {
            return this.getConfiguration().getTrackerIdFactory();
        }

        public Duration getPurgeDelay() {
            return this.getConfiguration().getPurgeDelay();
        }
    }
}

