/*
 * Decompiled with CFR 0.152.
 */
package org.apache.mahout.classifier.df.tools;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.mahout.classifier.df.DFUtils;
import org.apache.mahout.classifier.df.data.DataConverter;
import org.apache.mahout.classifier.df.data.Dataset;
import org.apache.mahout.classifier.df.data.Instance;
import org.apache.mahout.classifier.df.mapreduce.Builder;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FrequenciesJob {
    private static final Logger log = LoggerFactory.getLogger(FrequenciesJob.class);
    private final Path outputPath;
    private final Path datasetPath;
    private final Path dataPath;

    public FrequenciesJob(Path base, Path dataPath, Path datasetPath) {
        this.outputPath = new Path(base, "frequencies.output");
        this.dataPath = dataPath;
        this.datasetPath = datasetPath;
    }

    public int[][] run(Configuration conf) throws IOException, ClassNotFoundException, InterruptedException {
        FileSystem fs = this.outputPath.getFileSystem(conf);
        if (fs.exists(this.outputPath)) {
            throw new IOException("Output path already exists : " + this.outputPath);
        }
        URI[] files = new URI[]{this.datasetPath.toUri()};
        DistributedCache.setCacheFiles((URI[])files, (Configuration)conf);
        Job job = new Job(conf);
        job.setJarByClass(FrequenciesJob.class);
        FileInputFormat.setInputPaths((Job)job, (Path[])new Path[]{this.dataPath});
        FileOutputFormat.setOutputPath((Job)job, (Path)this.outputPath);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Frequencies.class);
        job.setMapperClass(FrequenciesMapper.class);
        job.setReducerClass(FrequenciesReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        boolean succeeded = job.waitForCompletion(true);
        if (!succeeded) {
            throw new IllegalStateException("Job failed!");
        }
        int[][] counts = this.parseOutput((JobContext)job);
        HadoopUtil.delete(conf, this.outputPath);
        return counts;
    }

    int[][] parseOutput(JobContext job) throws IOException {
        Configuration conf = job.getConfiguration();
        int numMaps = conf.getInt("mapred.map.tasks", -1);
        log.info("mapred.map.tasks = {}", (Object)numMaps);
        FileSystem fs = this.outputPath.getFileSystem(conf);
        Path[] outfiles = DFUtils.listOutputFiles(fs, this.outputPath);
        Object[] values = new Frequencies[numMaps];
        int index = 0;
        for (Path path : outfiles) {
            for (Frequencies value : new SequenceFileValueIterable(path, conf)) {
                values[index++] = value;
            }
        }
        if (index < numMaps) {
            throw new IllegalStateException("number of output Frequencies (" + index + ") is lesser than the number of mappers!");
        }
        Arrays.sort(values);
        return Frequencies.extractCounts((Frequencies[])values);
    }

    private static class Frequencies
    implements Writable,
    Comparable<Frequencies>,
    Cloneable {
        private long firstId;
        private int[] counts;

        Frequencies() {
        }

        Frequencies(long firstId, int[] counts) {
            this.firstId = firstId;
            this.counts = Arrays.copyOf(counts, counts.length);
        }

        public void readFields(DataInput in) throws IOException {
            this.firstId = in.readLong();
            this.counts = DFUtils.readIntArray(in);
        }

        public void write(DataOutput out) throws IOException {
            out.writeLong(this.firstId);
            DFUtils.writeArray(out, this.counts);
        }

        public boolean equals(Object other) {
            return other instanceof Frequencies && this.firstId == ((Frequencies)other).firstId;
        }

        public int hashCode() {
            return (int)this.firstId;
        }

        protected Frequencies clone() {
            return new Frequencies(this.firstId, this.counts);
        }

        @Override
        public int compareTo(Frequencies obj) {
            if (this.firstId < obj.firstId) {
                return -1;
            }
            if (this.firstId > obj.firstId) {
                return 1;
            }
            return 0;
        }

        public static int[][] extractCounts(Frequencies[] partitions) {
            int[][] counts = new int[partitions.length][];
            for (int p = 0; p < partitions.length; ++p) {
                counts[p] = partitions[p].counts;
            }
            return counts;
        }
    }

    private static class FrequenciesReducer
    extends Reducer<LongWritable, IntWritable, LongWritable, Frequencies> {
        private int nblabels;

        private FrequenciesReducer() {
        }

        protected void setup(Reducer.Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            Dataset dataset = Builder.loadDataset(conf);
            this.setup(dataset.nblabels());
        }

        void setup(int nblabels) {
            this.nblabels = nblabels;
        }

        protected void reduce(LongWritable key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            int[] counts = new int[this.nblabels];
            for (IntWritable value : values) {
                int n = value.get();
                counts[n] = counts[n] + 1;
            }
            context.write((Object)key, (Object)new Frequencies(key.get(), counts));
        }
    }

    private static class FrequenciesMapper
    extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
        private LongWritable firstId;
        private DataConverter converter;
        private Dataset dataset;

        private FrequenciesMapper() {
        }

        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            this.dataset = Builder.loadDataset(conf);
            this.setup(this.dataset);
        }

        void setup(Dataset dataset) {
            this.converter = new DataConverter(dataset);
        }

        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            if (this.firstId == null) {
                this.firstId = new LongWritable(key.get());
            }
            Instance instance = this.converter.convert(value.toString());
            context.write((Object)this.firstId, (Object)new IntWritable((int)this.dataset.getLabel(instance)));
        }
    }
}

