/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.rollup.v2;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.FutureArrays;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig;
import org.elasticsearch.xpack.core.rollup.job.HistogramGroupConfig;
import org.elasticsearch.xpack.core.rollup.job.MetricConfig;
import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig;
import org.elasticsearch.xpack.rollup.v2.CompressingOfflineSorter;
import org.elasticsearch.xpack.rollup.v2.FieldMetricsProducer;
import org.elasticsearch.xpack.rollup.v2.FieldValueFetcher;
import org.elasticsearch.xpack.rollup.v2.XExternalRefSorter;

class RollupShardIndexer {
    private static final Logger logger = LogManager.getLogger(RollupShardIndexer.class);
    private final IndexShard indexShard;
    private final Client client;
    private final RollupActionConfig config;
    private final String tmpIndex;
    private final Directory dir;
    private final Engine.Searcher searcher;
    private final SearchExecutionContext searchExecutionContext;
    private final MappedFieldType timestampField;
    private final DocValueFormat timestampFormat;
    private final Rounding.Prepared rounding;
    private final List<FieldValueFetcher> groupFieldFetchers;
    private final List<FieldValueFetcher> metricsFieldFetchers;
    private final CompressingOfflineSorter sorter;
    private final BulkProcessor bulkProcessor;
    private final AtomicLong numSent = new AtomicLong();
    private final AtomicLong numIndexed = new AtomicLong();
    final Set<String> tmpFiles = new HashSet<String>();
    final Set<String> tmpFilesDeleted = new HashSet<String>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    RollupShardIndexer(Client client, IndexService indexService, ShardId shardId, RollupActionConfig config, String tmpIndex, int ramBufferSizeMB) {
        this.client = client;
        this.indexShard = indexService.getShard(shardId.id());
        this.config = config;
        this.tmpIndex = tmpIndex;
        Engine.Searcher toClose = this.searcher = this.indexShard.acquireSearcher("rollup");
        try {
            this.dir = new FilterDirectory(this.searcher.getDirectoryReader().directory()){

                public IndexOutput createOutput(String name, IOContext context) throws IOException {
                    RollupShardIndexer.this.tmpFiles.add(name);
                    return super.createOutput(name, context);
                }

                public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
                    IndexOutput output = super.createTempOutput(prefix, suffix, context);
                    RollupShardIndexer.this.tmpFiles.add(output.getName());
                    return output;
                }

                public void deleteFile(String name) throws IOException {
                    RollupShardIndexer.this.tmpFilesDeleted.add(name);
                    super.deleteFile(name);
                }
            };
            this.searchExecutionContext = indexService.newSearchExecutionContext(this.indexShard.shardId().id(), 0, (IndexSearcher)this.searcher, () -> 0L, null, Collections.emptyMap());
            this.timestampField = this.searchExecutionContext.getFieldType(config.getGroupConfig().getDateHistogram().getField());
            this.verifyTimestampField(this.timestampField);
            this.timestampFormat = this.timestampField.docValueFormat(null, null);
            this.rounding = this.createRounding(config.getGroupConfig().getDateHistogram()).prepareForUnknown();
            this.groupFieldFetchers = new ArrayList<FieldValueFetcher>();
            if (config.getGroupConfig().getTerms() != null) {
                TermsGroupConfig termsConfig = config.getGroupConfig().getTerms();
                this.groupFieldFetchers.addAll(FieldValueFetcher.build(this.searchExecutionContext, termsConfig.getFields()));
            }
            if (config.getGroupConfig().getHistogram() != null) {
                HistogramGroupConfig histoConfig = config.getGroupConfig().getHistogram();
                this.groupFieldFetchers.addAll(FieldValueFetcher.buildHistograms(this.searchExecutionContext, histoConfig.getFields(), histoConfig.getInterval()));
            }
            if (config.getMetricsConfig().size() > 0) {
                String[] metricFields = (String[])config.getMetricsConfig().stream().map(MetricConfig::getField).toArray(String[]::new);
                this.metricsFieldFetchers = FieldValueFetcher.build(this.searchExecutionContext, metricFields);
            } else {
                this.metricsFieldFetchers = Collections.emptyList();
            }
            this.sorter = new CompressingOfflineSorter(this.dir, "rollup-", RollupShardIndexer.keyComparator(), ramBufferSizeMB);
            toClose = null;
        }
        finally {
            IOUtils.closeWhileHandlingException((Closeable)toClose);
        }
        this.bulkProcessor = this.createBulkProcessor();
    }

    private void verifyTimestampField(MappedFieldType fieldType) {
        if (fieldType == null) {
            throw new IllegalArgumentException("fieldType is null");
        }
        if (!(fieldType instanceof DateFieldMapper.DateFieldType)) {
            throw new IllegalArgumentException("Wrong type for the timestamp field, expected [date], got [" + fieldType.name() + "]");
        }
        if (!fieldType.isSearchable()) {
            throw new IllegalArgumentException("The timestamp field [" + fieldType.name() + "]  is not searchable");
        }
    }

    public long execute() throws IOException {
        Long bucket = Long.MIN_VALUE;
        try {
            while ((bucket = this.computeBucket(bucket)) != null) {
            }
        }
        finally {
            this.searcher.close();
            this.bulkProcessor.close();
        }
        logger.info("Successfully sent [" + this.numIndexed.get() + "], indexed [" + this.numIndexed.get() + "]");
        return this.numIndexed.get();
    }

    private BulkProcessor createBulkProcessor() {
        BulkProcessor.Listener listener = new BulkProcessor.Listener(){

            public void beforeBulk(long executionId, BulkRequest request) {
                RollupShardIndexer.this.numSent.addAndGet(request.numberOfActions());
            }

            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                RollupShardIndexer.this.numIndexed.addAndGet(request.numberOfActions());
                if (response.hasFailures()) {
                    Map<String, String> failures = Arrays.stream(response.getItems()).filter(BulkItemResponse::isFailed).collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage, (msg1, msg2) -> Objects.equals(msg1, msg2) ? msg1 : msg1 + "," + msg2));
                    logger.error("failures: [{}]", failures);
                }
            }

            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                long items = request.numberOfActions();
                RollupShardIndexer.this.numSent.addAndGet(-items);
            }
        };
        return BulkProcessor.builder((arg_0, arg_1) -> ((Client)this.client).bulk(arg_0, arg_1), (BulkProcessor.Listener)listener, (String)"rollup-shard-indexer").setBulkActions(10000).setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)).setConcurrentRequests(0).setBackoffPolicy(BackoffPolicy.exponentialBackoff((TimeValue)TimeValue.timeValueMillis((long)1000L), (int)3)).build();
    }

    private Rounding createRounding(RollupActionDateHistogramGroupConfig groupConfig) {
        Rounding.Builder tzRoundingBuilder;
        ZoneId zoneId;
        DateHistogramInterval interval = groupConfig.getInterval();
        ZoneId zoneId2 = zoneId = groupConfig.getTimeZone() != null ? ZoneId.of(groupConfig.getTimeZone()) : null;
        if (groupConfig instanceof RollupActionDateHistogramGroupConfig.FixedInterval) {
            TimeValue timeValue = TimeValue.parseTimeValue((String)interval.toString(), null, (String)(this.getClass().getSimpleName() + ".interval"));
            tzRoundingBuilder = Rounding.builder((TimeValue)timeValue);
        } else if (groupConfig instanceof RollupActionDateHistogramGroupConfig.CalendarInterval) {
            Rounding.DateTimeUnit dateTimeUnit = (Rounding.DateTimeUnit)DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval.toString());
            tzRoundingBuilder = Rounding.builder((Rounding.DateTimeUnit)dateTimeUnit);
        } else {
            throw new IllegalStateException("unsupported interval type");
        }
        return tzRoundingBuilder.timeZone(zoneId).build();
    }

    private void indexBucket(BucketKey key, List<FieldMetricsProducer> fieldsMetrics, int docCount) {
        IndexRequestBuilder request = this.client.prepareIndex(this.tmpIndex, "_doc");
        HashMap<String, Object> doc = new HashMap<String, Object>(2 + key.groupFields.size() + fieldsMetrics.size());
        doc.put("_doc_count", docCount);
        doc.put(this.timestampField.name(), this.timestampFormat.format(key.timestamp));
        for (int i = 0; i < key.groupFields.size(); ++i) {
            FieldValueFetcher fetcher = this.groupFieldFetchers.get(i);
            if (key.groupFields.get(i) == null) continue;
            doc.put(fetcher.name, fetcher.format(key.groupFields.get(i)));
        }
        for (FieldMetricsProducer field : fieldsMetrics) {
            HashMap<String, Number> map = new HashMap<String, Number>();
            for (FieldMetricsProducer.Metric metric : field.metrics) {
                map.put(metric.name, metric.get());
            }
            doc.put(field.fieldName, map);
        }
        request.setSource(doc);
        this.bulkProcessor.add((IndexRequest)request.request());
    }

    private Long computeBucket(long lastRounding) throws IOException {
        Long nextRounding = this.findNextRounding(lastRounding);
        if (nextRounding == null) {
            return null;
        }
        long nextRoundingLastValue = this.rounding.nextRoundingValue(nextRounding.longValue()) - 1L;
        try (XExternalRefSorter externalSorter = new XExternalRefSorter(this.sorter);){
            Query rangeQuery = LongPoint.newRangeQuery((String)this.timestampField.name(), (long)nextRounding, (long)nextRoundingLastValue);
            this.searcher.search(rangeQuery, (Collector)new BucketCollector(nextRounding, externalSorter));
            BytesRefIterator it = externalSorter.iterator();
            BytesRef next = it.next();
            List<FieldMetricsProducer> fieldsMetrics = FieldMetricsProducer.buildMetrics(this.config.getMetricsConfig());
            BucketKey lastKey = null;
            int docCount = 0;
            while (next != null) {
                try (ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(next.bytes, next.offset, next.length));){
                    in.readInt();
                    BucketKey key = RollupShardIndexer.decodeKey((StreamInput)in, this.groupFieldFetchers.size());
                    if (lastKey != null && !lastKey.equals(key)) {
                        this.indexBucket(lastKey, fieldsMetrics, docCount);
                        docCount = 0;
                        for (FieldMetricsProducer producer : fieldsMetrics) {
                            producer.reset();
                        }
                    }
                    for (FieldMetricsProducer field : fieldsMetrics) {
                        int size = in.readVInt();
                        for (int i = 0; i < size; ++i) {
                            double value = in.readDouble();
                            for (FieldMetricsProducer.Metric metric : field.metrics) {
                                metric.collect(value);
                            }
                        }
                    }
                    ++docCount;
                    lastKey = key;
                }
                next = it.next();
            }
            if (lastKey != null) {
                this.indexBucket(lastKey, fieldsMetrics, docCount);
            }
        }
        return nextRoundingLastValue;
    }

    private Long findNextRounding(long lastRounding) throws IOException {
        Long nextRounding = null;
        for (LeafReaderContext leafReaderContext : this.searcher.getIndexReader().leaves()) {
            PointValues pointValues = leafReaderContext.reader().getPointValues(this.timestampField.name());
            NextRoundingVisitor visitor = new NextRoundingVisitor(this.rounding, lastRounding);
            try {
                pointValues.intersect((PointValues.IntersectVisitor)visitor);
            }
            catch (CollectionTerminatedException collectionTerminatedException) {
                // empty catch block
            }
            if (visitor.nextRounding == null) continue;
            nextRounding = nextRounding == null ? visitor.nextRounding : Math.min(nextRounding, visitor.nextRounding);
        }
        return nextRounding;
    }

    private static BytesRef encodeKey(long timestamp, List<Object> groupFields) throws IOException {
        try (BytesStreamOutput out = new BytesStreamOutput();){
            out.writeLong(timestamp);
            for (Object obj : groupFields) {
                out.writeGenericValue(obj);
            }
            BytesRef bytesRef = out.bytes().toBytesRef();
            return bytesRef;
        }
    }

    private static BucketKey decodeKey(StreamInput in, int numGroupFields) throws IOException {
        long timestamp = in.readLong();
        ArrayList<Object> values = new ArrayList<Object>();
        for (int i = 0; i < numGroupFields; ++i) {
            values.add(in.readGenericValue());
        }
        return new BucketKey(timestamp, values);
    }

    private static Comparator<BytesRef> keyComparator() {
        return (o1, o2) -> {
            int keySize1 = RollupShardIndexer.readInt(o1.bytes, o1.offset);
            int keySize2 = RollupShardIndexer.readInt(o2.bytes, o2.offset);
            return FutureArrays.compareUnsigned((byte[])o1.bytes, (int)(o1.offset + 4), (int)(keySize1 + o1.offset + 4), (byte[])o2.bytes, (int)(o2.offset + 4), (int)(keySize2 + o2.offset + 4));
        };
    }

    private static int readInt(byte[] bytes, int offset) {
        return (bytes[offset] & 0xFF) << 24 | (bytes[offset + 1] & 0xFF) << 16 | (bytes[offset + 2] & 0xFF) << 8 | bytes[offset + 3] & 0xFF;
    }

    private static List<List<Object>> cartesianProduct(List<List<Object>> lists) {
        List<List<Object>> combinations = Arrays.asList(Arrays.asList(new Object[0]));
        for (List<Object> list : lists) {
            ArrayList<List<Object>> extraColumnCombinations = new ArrayList<List<Object>>();
            for (List<Object> combination : combinations) {
                for (Object element : list) {
                    ArrayList<Object> newCombination = new ArrayList<Object>(combination);
                    newCombination.add(element);
                    extraColumnCombinations.add(newCombination);
                }
            }
            combinations = extraColumnCombinations;
        }
        return combinations;
    }

    private static class BucketKey {
        private final long timestamp;
        private final List<Object> groupFields;

        BucketKey(long timestamp, List<Object> groupFields) {
            this.timestamp = timestamp;
            this.groupFields = groupFields;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            BucketKey other = (BucketKey)o;
            return this.timestamp == other.timestamp && Objects.equals(this.groupFields, other.groupFields);
        }

        public int hashCode() {
            return Objects.hash(this.timestamp, this.groupFields);
        }

        public String toString() {
            return "BucketKey{timestamp=" + this.timestamp + ", groupFields=" + this.groupFields + '}';
        }
    }

    private class BucketCollector
    implements Collector {
        private final long timestamp;
        private final XExternalRefSorter externalSorter;

        private BucketCollector(long timestamp, XExternalRefSorter externalSorter) {
            this.externalSorter = externalSorter;
            this.timestamp = timestamp;
        }

        public LeafCollector getLeafCollector(LeafReaderContext context) {
            final List<FormattedDocValues> groupFieldLeaves = this.leafFetchers(context, RollupShardIndexer.this.groupFieldFetchers);
            final List<FormattedDocValues> metricsFieldLeaves = this.leafFetchers(context, RollupShardIndexer.this.metricsFieldFetchers);
            return new LeafCollector(){

                public void setScorer(Scorable scorer) {
                }

                public void collect(int docID) throws IOException {
                    BytesRef valueBytes;
                    ArrayList<Object> combinationKeys = new ArrayList<Object>();
                    for (FormattedDocValues leafField : groupFieldLeaves) {
                        if (leafField.advanceExact(docID)) {
                            ArrayList lst = new ArrayList();
                            for (int i = 0; i < leafField.docValueCount(); ++i) {
                                lst.add(leafField.nextValue());
                            }
                            combinationKeys.add(lst);
                            continue;
                        }
                        combinationKeys.add(null);
                    }
                    try (BytesStreamOutput out = new BytesStreamOutput();){
                        for (FormattedDocValues formattedDocValues : metricsFieldLeaves) {
                            if (formattedDocValues.advanceExact(docID)) {
                                out.writeVInt(formattedDocValues.docValueCount());
                                for (int i = 0; i < formattedDocValues.docValueCount(); ++i) {
                                    Object obj = formattedDocValues.nextValue();
                                    if (!(obj instanceof Number)) {
                                        throw new IllegalArgumentException("Expected [Number], got [" + obj.getClass() + "]");
                                    }
                                    out.writeDouble(((Number)obj).doubleValue());
                                }
                                continue;
                            }
                            out.writeVInt(0);
                        }
                        valueBytes = out.bytes().toBytesRef();
                    }
                    for (List groupFields : RollupShardIndexer.cartesianProduct(combinationKeys)) {
                        try (BytesStreamOutput out = new BytesStreamOutput();){
                            BytesRef keyBytes = RollupShardIndexer.encodeKey(BucketCollector.this.timestamp, groupFields);
                            out.writeInt(keyBytes.length);
                            out.writeBytes(keyBytes.bytes, keyBytes.offset, keyBytes.length);
                            out.writeBytes(valueBytes.bytes, valueBytes.offset, valueBytes.length);
                            BucketCollector.this.externalSorter.add(out.bytes().toBytesRef());
                        }
                    }
                }
            };
        }

        private List<FormattedDocValues> leafFetchers(LeafReaderContext context, List<FieldValueFetcher> fetchers) {
            ArrayList<FormattedDocValues> leaves = new ArrayList<FormattedDocValues>();
            for (FieldValueFetcher fetcher : fetchers) {
                leaves.add(fetcher.getLeaf(context));
            }
            return leaves;
        }

        public ScoreMode scoreMode() {
            return ScoreMode.COMPLETE_NO_SCORES;
        }
    }

    private class NextRoundingVisitor
    implements PointValues.IntersectVisitor {
        final Rounding.Prepared rounding;
        final long lastRounding;
        Long nextRounding = null;

        NextRoundingVisitor(Rounding.Prepared rounding, long lastRounding) {
            this.rounding = rounding;
            this.lastRounding = lastRounding;
        }

        public void visit(int docID) {
            throw new IllegalStateException("should never be called");
        }

        public void visit(DocIdSetIterator iterator, byte[] packedValue) {
            long bucket = this.rounding.round(LongPoint.decodeDimension((byte[])packedValue, (int)0));
            this.checkMinRounding(bucket);
        }

        public void visit(int docID, byte[] packedValue) {
            long bucket = this.rounding.round(LongPoint.decodeDimension((byte[])packedValue, (int)0));
            this.checkMinRounding(bucket);
        }

        public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
            long maxRounding = this.rounding.round(LongPoint.decodeDimension((byte[])maxPackedValue, (int)0));
            if (maxRounding <= this.lastRounding) {
                return PointValues.Relation.CELL_OUTSIDE_QUERY;
            }
            long minRounding = this.rounding.round(LongPoint.decodeDimension((byte[])minPackedValue, (int)0));
            this.checkMinRounding(minRounding);
            return PointValues.Relation.CELL_CROSSES_QUERY;
        }

        private void checkMinRounding(long roundingValue) {
            if (roundingValue > this.lastRounding) {
                this.nextRounding = roundingValue;
                throw new CollectionTerminatedException();
            }
        }
    }
}

