/*
 * Decompiled with CFR 0.152.
 */
package io.druid.segment.data;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class IncrementalIndexTest {
    private final IndexCreator indexCreator;
    private static final AggregatorFactory[] defaultAggregatorFactories = new AggregatorFactory[]{new CountAggregatorFactory("count")};

    public IncrementalIndexTest(IndexCreator indexCreator) {
        this.indexCreator = indexCreator;
    }

    @Parameterized.Parameters
    public static Collection<?> constructorFeeder() throws IOException {
        return Arrays.asList({new IndexCreator(){

            @Override
            public IncrementalIndex createIndex(AggregatorFactory[] factories) {
                return IncrementalIndexTest.createIndex(true, factories);
            }
        }}, {new IndexCreator(){

            @Override
            public IncrementalIndex createIndex(AggregatorFactory[] factories) {
                return IncrementalIndexTest.createIndex(false, factories);
            }
        }});
    }

    public static IncrementalIndex createIndex(boolean offheap, AggregatorFactory[] aggregatorFactories) {
        if (null == aggregatorFactories) {
            aggregatorFactories = defaultAggregatorFactories;
        }
        if (offheap) {
            return new OffheapIncrementalIndex(0L, QueryGranularity.NONE, aggregatorFactories, TestQueryRunners.pool, true, 0x6400000);
        }
        return new OnheapIncrementalIndex(0L, QueryGranularity.NONE, aggregatorFactories, 1000000);
    }

    public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException {
        index.add((InputRow)new MapBasedInputRow(timestamp, Arrays.asList("dim1", "dim2"), (Map)ImmutableMap.of((Object)"dim1", (Object)"1", (Object)"dim2", (Object)"2")));
        index.add((InputRow)new MapBasedInputRow(timestamp, Arrays.asList("dim1", "dim2"), (Map)ImmutableMap.of((Object)"dim1", (Object)"3", (Object)"dim2", (Object)"4")));
    }

    public static MapBasedInputRow getRow(long timestamp, int rowID, int dimensionCount) {
        ArrayList<String> dimensionList = new ArrayList<String>(dimensionCount);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (int i = 0; i < dimensionCount; ++i) {
            String dimName = String.format("Dim_%d", i);
            dimensionList.add(dimName);
            builder.put((Object)dimName, (Object)(dimName + rowID));
        }
        return new MapBasedInputRow(timestamp, dimensionList, (Map)builder.build());
    }

    private static MapBasedInputRow getLongRow(long timestamp, int rowID, int dimensionCount) {
        ArrayList<String> dimensionList = new ArrayList<String>(dimensionCount);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (int i = 0; i < dimensionCount; ++i) {
            String dimName = String.format("Dim_%d", i);
            dimensionList.add(dimName);
            builder.put((Object)dimName, (Object)1L);
        }
        return new MapBasedInputRow(timestamp, dimensionList, (Map)builder.build());
    }

    @Test
    public void testCaseSensitivity() throws Exception {
        long timestamp = System.currentTimeMillis();
        IncrementalIndex index = this.indexCreator.createIndex(defaultAggregatorFactories);
        IncrementalIndexTest.populateIndex(timestamp, index);
        Assert.assertEquals(Arrays.asList("dim1", "dim2"), (Object)index.getDimensions());
        Assert.assertEquals((long)2L, (long)index.size());
        Iterator rows = index.iterator();
        Row row = (Row)rows.next();
        Assert.assertEquals((long)timestamp, (long)row.getTimestampFromEpoch());
        Assert.assertEquals(Arrays.asList("1"), (Object)row.getDimension("dim1"));
        Assert.assertEquals(Arrays.asList("2"), (Object)row.getDimension("dim2"));
        row = (Row)rows.next();
        Assert.assertEquals((long)timestamp, (long)row.getTimestampFromEpoch());
        Assert.assertEquals(Arrays.asList("3"), (Object)row.getDimension("dim1"));
        Assert.assertEquals(Arrays.asList("4"), (Object)row.getDimension("dim2"));
    }

    @Test(timeout=60000L)
    public void testConcurrentAddRead() throws InterruptedException, ExecutionException {
        int dimensionCount = 5;
        ArrayList<Object> ingestAggregatorFactories = new ArrayList<Object>(6);
        ingestAggregatorFactories.add(new CountAggregatorFactory("rows"));
        for (int i = 0; i < 5; ++i) {
            ingestAggregatorFactories.add(new LongSumAggregatorFactory(String.format("sumResult%s", i), String.format("Dim_%s", i)));
            ingestAggregatorFactories.add(new DoubleSumAggregatorFactory(String.format("doubleSumResult%s", i), String.format("Dim_%s", i)));
        }
        ArrayList<Object> queryAggregatorFactories = new ArrayList<Object>(6);
        queryAggregatorFactories.add(new CountAggregatorFactory("rows"));
        for (int i = 0; i < 5; ++i) {
            queryAggregatorFactories.add(new LongSumAggregatorFactory(String.format("sumResult%s", i), String.format("sumResult%s", i)));
            queryAggregatorFactories.add(new DoubleSumAggregatorFactory(String.format("doubleSumResult%s", i), String.format("doubleSumResult%s", i)));
        }
        final IncrementalIndex index = this.indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[5]));
        int taskCount = 30;
        int concurrentThreads = 3;
        int elementsPerThread = 100;
        ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setDaemon(false).setNameFormat("index-executor-%d").setPriority(1).build()));
        ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setDaemon(false).setNameFormat("query-executor-%d").build()));
        final long timestamp = System.currentTimeMillis();
        Interval queryInterval = new Interval((Object)"1900-01-01T00:00:00Z/2900-01-01T00:00:00Z");
        LinkedList<ListenableFuture> indexFutures = new LinkedList<ListenableFuture>();
        LinkedList<ListenableFuture> queryFutures = new LinkedList<ListenableFuture>();
        IncrementalIndexSegment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
        TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER);
        final AtomicInteger currentlyRunning = new AtomicInteger(0);
        AtomicBoolean concurrentlyRan = new AtomicBoolean(false);
        final AtomicInteger someoneRan = new AtomicInteger(0);
        for (int j = 0; j < 30; ++j) {
            indexFutures.add(indexExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    currentlyRunning.incrementAndGet();
                    try {
                        for (int i = 0; i < 100; ++i) {
                            someoneRan.incrementAndGet();
                            index.add((InputRow)IncrementalIndexTest.getLongRow(timestamp + (long)i, i, 5));
                        }
                    }
                    catch (IndexSizeExceededException e) {
                        throw Throwables.propagate((Throwable)e);
                    }
                    currentlyRunning.decrementAndGet();
                }
            }));
            queryFutures.add(queryExecutor.submit(new Runnable((QueryRunnerFactory)factory, (Segment)incrementalIndexSegment, queryInterval, queryAggregatorFactories, someoneRan, currentlyRunning, concurrentlyRan){
                final /* synthetic */ QueryRunnerFactory val$factory;
                final /* synthetic */ Segment val$incrementalIndexSegment;
                final /* synthetic */ Interval val$queryInterval;
                final /* synthetic */ ArrayList val$queryAggregatorFactories;
                final /* synthetic */ AtomicInteger val$someoneRan;
                final /* synthetic */ AtomicInteger val$currentlyRunning;
                final /* synthetic */ AtomicBoolean val$concurrentlyRan;
                {
                    this.val$factory = queryRunnerFactory;
                    this.val$incrementalIndexSegment = segment;
                    this.val$queryInterval = interval;
                    this.val$queryAggregatorFactories = arrayList;
                    this.val$someoneRan = atomicInteger;
                    this.val$currentlyRunning = atomicInteger2;
                    this.val$concurrentlyRan = atomicBoolean;
                }

                @Override
                public void run() {
                    FinalizeResultsQueryRunner runner = new FinalizeResultsQueryRunner(this.val$factory.createRunner(this.val$incrementalIndexSegment), this.val$factory.getToolchest());
                    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("xxx").granularity(QueryGranularity.ALL).intervals((List)ImmutableList.of((Object)this.val$queryInterval)).aggregators((List)this.val$queryAggregatorFactories).build();
                    HashMap context = new HashMap();
                    for (Result result : (LinkedList)Sequences.toList((Sequence)runner.run((Query)query, context), new LinkedList())) {
                        Integer ranCount = this.val$someoneRan.get();
                        if (ranCount <= 0) continue;
                        Double sumResult = ((TimeseriesResultValue)result.getValue()).getDoubleMetric("doubleSumResult0");
                        Assert.assertTrue((String)String.format("%d >= %g >= 0 violated", ranCount, sumResult), (sumResult >= 0.0 && sumResult <= (double)ranCount.intValue() ? 1 : 0) != 0);
                    }
                    if (this.val$currentlyRunning.get() > 0) {
                        this.val$concurrentlyRan.set(true);
                    }
                }
            }));
        }
        ArrayList<ListenableFuture> allFutures = new ArrayList<ListenableFuture>(queryFutures.size() + indexFutures.size());
        allFutures.addAll(queryFutures);
        allFutures.addAll(indexFutures);
        Futures.allAsList(allFutures).get();
        Assert.assertTrue((String)"Did not hit concurrency, please try again", (boolean)concurrentlyRan.get());
        queryExecutor.shutdown();
        indexExecutor.shutdown();
        FinalizeResultsQueryRunner runner = new FinalizeResultsQueryRunner(factory.createRunner((Segment)incrementalIndexSegment), factory.getToolchest());
        TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("xxx").granularity(QueryGranularity.ALL).intervals((List)ImmutableList.of((Object)queryInterval)).aggregators(queryAggregatorFactories).build();
        HashMap context = new HashMap();
        List results = Sequences.toList((Sequence)runner.run((Query)query, context), new LinkedList());
        for (Result result : results) {
            Assert.assertEquals((long)100L, (long)((TimeseriesResultValue)result.getValue()).getLongMetric("rows").intValue());
            for (int i = 0; i < 5; ++i) {
                Assert.assertEquals((String)String.format("Failed long sum on dimension %d", i), (long)3000L, (long)((TimeseriesResultValue)result.getValue()).getLongMetric(String.format("sumResult%s", i)).intValue());
                Assert.assertEquals((String)String.format("Failed double sum on dimension %d", i), (long)3000L, (long)((TimeseriesResultValue)result.getValue()).getDoubleMetric(String.format("doubleSumResult%s", i)).intValue());
            }
        }
    }

    @Test
    public void testConcurrentAdd() throws Exception {
        final IncrementalIndex index = this.indexCreator.createIndex(defaultAggregatorFactories);
        int threadCount = 10;
        int elementsPerThread = 200;
        int dimensionCount = 5;
        ExecutorService executor = Executors.newFixedThreadPool(10);
        final long timestamp = System.currentTimeMillis();
        final CountDownLatch latch = new CountDownLatch(10);
        for (int j = 0; j < 10; ++j) {
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 200; ++i) {
                            index.add((InputRow)IncrementalIndexTest.getRow(timestamp + (long)i, i, 5));
                        }
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    latch.countDown();
                }
            });
        }
        Assert.assertTrue((boolean)latch.await(60L, TimeUnit.SECONDS));
        Assert.assertEquals((long)5L, (long)index.getDimensions().size());
        Assert.assertEquals((long)200L, (long)index.size());
        Iterator iterator = index.iterator();
        int curr = 0;
        while (iterator.hasNext()) {
            Row row = (Row)iterator.next();
            Assert.assertEquals((long)(timestamp + (long)curr), (long)row.getTimestampFromEpoch());
            Assert.assertEquals((Object)Float.valueOf(10.0f), (Object)Float.valueOf(row.getFloatMetric("count")));
            ++curr;
        }
        Assert.assertEquals((long)200L, (long)curr);
    }

    @Test
    public void testOffheapIndexIsFull() throws IndexSizeExceededException {
        OffheapIncrementalIndex index = new OffheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}, TestQueryRunners.pool, true, 0xC00000);
        int rowCount = 0;
        for (int i = 0; i < 500; ++i) {
            rowCount = index.add((InputRow)IncrementalIndexTest.getRow(System.currentTimeMillis(), i, 100));
            if (!index.canAppendRow()) break;
        }
        Assert.assertTrue((String)("rowCount : " + rowCount), (rowCount > 200 && rowCount < 600 ? 1 : 0) != 0);
    }

    @Test
    public void testgetDimensions() {
        OnheapIncrementalIndex incrementalIndex = new OnheapIncrementalIndex(new IncrementalIndexSchema.Builder().withQueryGranularity(QueryGranularity.NONE).withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}).withDimensionsSpec(new DimensionsSpec(Arrays.asList("dim0", "dim1"), null, null)).build(), true, 1000000);
        Assert.assertEquals(Arrays.asList("dim0", "dim1"), (Object)incrementalIndex.getDimensions());
    }

    static interface IndexCreator {
        public IncrementalIndex createIndex(AggregatorFactory[] var1);
    }
}

