Я пытаюсь запустить KMeans на AWS и столкнулся со следующим исключением при попытке прочитать обновленные центроиды кластера из DistributedCache:
java.io.IOException: The distributed cache object s3://mybucket/centroids_6/part-r-00009 changed during the job from 4/8/13 2:20 PM to 4/8/13 2:20 PM
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.downloadCacheObject(TrackerDistributedCacheManager.java:401)
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.localizePublicCacheObject(TrackerDistributedCacheManager.java:475)
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.getLocalCache(TrackerDistributedCacheManager.java:191)
at org.apache.hadoop.filecache.TaskDistributedCacheManager.setupCache(TaskDistributedCacheManager.java:182)
at org.apache.hadoop.mapred.TaskTracker$4.run(TaskTracker.java:1246)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1237)
at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1152)
at org.apache.hadoop.mapred.TaskTracker$5.run(TaskTracker.java:2541)
at java.lang.Thread.run(Thread.java:662)
Что отличает этот вопрос от этого вопроса, так это тот факт, что эта ошибка появляется периодически. Я успешно запустил тот же код на меньшем наборе данных. Кроме того, когда я изменяю количество центроидов с 12 (показано выше в коде) на 8, происходит сбой на итерации 5 вместо 6 (что вы можете видеть в названии centroids_6
выше).
Вот соответствующий код DistributedCache в основном драйвере, который запускает цикл KMeans:
int iteration = 1;
long changes = 0;
do {
// First, write the previous iteration's centroids to the dist cache.
Configuration iterConf = new Configuration();
Path prevIter = new Path(centroidsPath.getParent(),
String.format("centroids_%s", iteration - 1));
FileSystem fs = prevIter.getFileSystem(iterConf);
Path pathPattern = new Path(prevIter, "part-*");
FileStatus [] list = fs.globStatus(pathPattern);
for (FileStatus status : list) {
DistributedCache.addCacheFile(status.getPath().toUri(), iterConf);
}
// Now, set up the job.
Job iterJob = new Job(iterConf);
iterJob.setJobName("KMeans " + iteration);
iterJob.setJarByClass(KMeansDriver.class);
Path nextIter = new Path(centroidsPath.getParent(),
String.format("centroids_%s", iteration));
KMeansDriver.delete(iterConf, nextIter);
// Set input/output formats.
iterJob.setInputFormatClass(SequenceFileInputFormat.class);
iterJob.setOutputFormatClass(SequenceFileOutputFormat.class);
// Set Mapper, Reducer, Combiner
iterJob.setMapperClass(KMeansMapper.class);
iterJob.setCombinerClass(KMeansCombiner.class);
iterJob.setReducerClass(KMeansReducer.class);
// Set MR formats.
iterJob.setMapOutputKeyClass(IntWritable.class);
iterJob.setMapOutputValueClass(VectorWritable.class);
iterJob.setOutputKeyClass(IntWritable.class);
iterJob.setOutputValueClass(VectorWritable.class);
// Set input/output paths.
FileInputFormat.addInputPath(iterJob, data);
FileOutputFormat.setOutputPath(iterJob, nextIter);
iterJob.setNumReduceTasks(nReducers);
if (!iterJob.waitForCompletion(true)) {
System.err.println("ERROR: Iteration " + iteration + " failed!");
System.exit(1);
}
iteration++;
changes = iterJob.getCounters().findCounter(KMeansDriver.Counter.CONVERGED).getValue();
iterJob.getCounters().findCounter(KMeansDriver.Counter.CONVERGED).setValue(0);
} while (changes > 0);
Как еще файлы будут изменены? Единственная возможность, о которой я могу думать, заключается в том, что по завершении одной итерации цикл начинается снова до того, как закончат запись центроиды из предыдущей задачи. Но в комментарии я вызываю задание с помощью waitForCompletion(true)
, поэтому не должно быть никаких остаточных частей задания, когда цикл начинается заново. Есть идеи?