Объект Hadoop DistributedCache изменен во время задания

Я пытаюсь запустить KMeans на AWS и столкнулся со следующим исключением при попытке прочитать обновленные центроиды кластера из DistributedCache:

java.io.IOException: The distributed cache object s3://mybucket/centroids_6/part-r-00009 changed during the job from 4/8/13 2:20 PM to 4/8/13 2:20 PM
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.downloadCacheObject(TrackerDistributedCacheManager.java:401)
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.localizePublicCacheObject(TrackerDistributedCacheManager.java:475)
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.getLocalCache(TrackerDistributedCacheManager.java:191)
at org.apache.hadoop.filecache.TaskDistributedCacheManager.setupCache(TaskDistributedCacheManager.java:182)
at org.apache.hadoop.mapred.TaskTracker$4.run(TaskTracker.java:1246)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1237)
at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1152)
at org.apache.hadoop.mapred.TaskTracker$5.run(TaskTracker.java:2541)
at java.lang.Thread.run(Thread.java:662)

Что отличает этот вопрос от этого вопроса, так это тот факт, что эта ошибка появляется периодически. Я успешно запустил тот же код на меньшем наборе данных. Кроме того, когда я изменяю количество центроидов с 12 (показано выше в коде) на 8, происходит сбой на итерации 5 вместо 6 (что вы можете видеть в названии centroids_6 выше).

Вот соответствующий код DistributedCache в основном драйвере, который запускает цикл KMeans:

    int iteration = 1;
    long changes = 0; 
    do {
        // First, write the previous iteration's centroids to the dist cache.
        Configuration iterConf = new Configuration();
        Path prevIter = new Path(centroidsPath.getParent(),
                String.format("centroids_%s", iteration - 1));
        FileSystem fs = prevIter.getFileSystem(iterConf);
        Path pathPattern = new Path(prevIter, "part-*");
        FileStatus [] list = fs.globStatus(pathPattern);
        for (FileStatus status : list) {
            DistributedCache.addCacheFile(status.getPath().toUri(), iterConf);
        }

        // Now, set up the job.
        Job iterJob = new Job(iterConf);
        iterJob.setJobName("KMeans " + iteration);
        iterJob.setJarByClass(KMeansDriver.class);
        Path nextIter = new Path(centroidsPath.getParent(), 
                String.format("centroids_%s", iteration));
        KMeansDriver.delete(iterConf, nextIter);

        // Set input/output formats.
        iterJob.setInputFormatClass(SequenceFileInputFormat.class);
        iterJob.setOutputFormatClass(SequenceFileOutputFormat.class);

        // Set Mapper, Reducer, Combiner
        iterJob.setMapperClass(KMeansMapper.class);
        iterJob.setCombinerClass(KMeansCombiner.class);
        iterJob.setReducerClass(KMeansReducer.class);

        // Set MR formats.
        iterJob.setMapOutputKeyClass(IntWritable.class);
        iterJob.setMapOutputValueClass(VectorWritable.class);
        iterJob.setOutputKeyClass(IntWritable.class);
        iterJob.setOutputValueClass(VectorWritable.class);

        // Set input/output paths.
        FileInputFormat.addInputPath(iterJob, data);
        FileOutputFormat.setOutputPath(iterJob, nextIter);

        iterJob.setNumReduceTasks(nReducers);

        if (!iterJob.waitForCompletion(true)) {
            System.err.println("ERROR: Iteration " + iteration + " failed!");
            System.exit(1);
        }
        iteration++;
        changes = iterJob.getCounters().findCounter(KMeansDriver.Counter.CONVERGED).getValue();
        iterJob.getCounters().findCounter(KMeansDriver.Counter.CONVERGED).setValue(0);
    } while (changes > 0);

Как еще файлы будут изменены? Единственная возможность, о которой я могу думать, заключается в том, что по завершении одной итерации цикл начинается снова до того, как закончат запись центроиды из предыдущей задачи. Но в комментарии я вызываю задание с помощью waitForCompletion(true), поэтому не должно быть никаких остаточных частей задания, когда цикл начинается заново. Есть идеи?


person Magsol    schedule 08.04.2013    source источник
comment
Работают ли другие потоки?   -  person Aubin    schedule 08.04.2013
comment
Это все на AWS (Elastic MapReduce), так что я понятия не имею. Я сам не писал никаких явных потоков. Кроме того, в самом задании DistributedCache доступен только для чтения; Я думаю, что это требование встроено в Hadoop. (поправьте меня если я ошибаюсь???)   -  person Magsol    schedule 08.04.2013


Ответы (1)


На самом деле это не ответ, но я понял, что глупо использовать DistributedCache так, как я, в отличие от чтения результатов предыдущей итерации непосредственно из HDFS. Вместо этого я написал этот метод в основном драйвере:

public static HashMap<Integer, VectorWritable> readCentroids(Configuration conf, Path path)
        throws IOException {
    HashMap<Integer, VectorWritable> centroids = new HashMap<Integer, VectorWritable>();
    FileSystem fs = FileSystem.get(path.toUri(), conf);
    FileStatus [] list = fs.globStatus(new Path(path, "part-*"));
    for (FileStatus status : list) {
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
        IntWritable key = null;
        VectorWritable value = null;
        try {
            key = (IntWritable)reader.getKeyClass().newInstance();
            value = (VectorWritable)reader.getValueClass().newInstance();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        while (reader.next(key, value)) {
            centroids.put(new Integer(key.get()),
                    new VectorWritable(value.get(), value.getClusterId(), value.getNumInstances()));
        }
        reader.close();
    }
    return centroids;
}

Это вызывается в методе setup() Mapper и Reducer во время каждой итерации, чтобы прочитать центроиды предыдущей итерации.

protected void setup(Context context) throws IOException {
    Configuration conf = context.getConfiguration();
    Path centroidsPath = new Path(conf.get(KMeansDriver.CENTROIDS));
    centroids = KMeansDriver.readCentroids(conf, centroidsPath);
}

Это позволило мне удалить блок кода в цикле в моем исходном вопросе, который записывает центроиды в DistributedCache. Я протестировал его, и теперь он работает как с большими, так и с маленькими наборами данных.

Я до сих пор не знаю, почему я получаю сообщение об ошибке, о котором я писал (как что-то изменить в DistributedCache, доступном только для чтения? особенно когда я менял пути HDFS на каждой итерации?), но это, похоже, работает и является гораздо менее хакерский способ чтения центроидов.

person Magsol    schedule 10.04.2013