Skip to content

Commit

Permalink
Unlock database when Injector finishes - regardless of result
Browse files Browse the repository at this point in the history
  • Loading branch information
cube authored and sebastian-nagel committed Oct 23, 2024
1 parent 4a61208 commit 3495472
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 32 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ lib/spotbugs-*
ivy/dependency-check-ant/*
.gradle*
ivy/apache-rat-*
.vscode
70 changes: 38 additions & 32 deletions src/java/org/apache/nutch/crawl/Injector.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,39 @@ public Injector(Configuration conf) {
setConf(conf);
}

private Job prepareJob(Configuration conf, Path urlDir, Path current, Path tempCrawlDb) throws IOException {
Job job = Job.getInstance(conf, "Nutch Injector: " + urlDir);
job.setJarByClass(Injector.class);
job.setMapperClass(InjectMapper.class);
job.setReducerClass(InjectReducer.class);
job.setOutputFormatClass(MapFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setSpeculativeExecution(false);

// set input and output paths of the job
MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
FileStatus[] seedFiles = urlDir.getFileSystem(conf).listStatus(urlDir);
int numSeedFiles = 0;
for (FileStatus seedFile : seedFiles) {
if (seedFile.isFile()) {
MultipleInputs.addInputPath(job, seedFile.getPath(),
KeyValueTextInputFormat.class);
numSeedFiles++;
LOG.info("Injecting seed URL file {}", seedFile.getPath());
} else {
LOG.warn("Skipped non-file input in {}: {}", urlDir,
seedFile.getPath());
}
}
if (numSeedFiles == 0) {
LOG.error("No seed files to inject found in {}", urlDir);
throw new IllegalStateException("No seed files found");
}
FileOutputFormat.setOutputPath(job, tempCrawlDb);
return job;
}

public void inject(Path crawlDb, Path urlDir)
throws IOException, ClassNotFoundException, InterruptedException {
inject(crawlDb, urlDir, false, false);
Expand Down Expand Up @@ -400,40 +433,11 @@ public void inject(Path crawlDb, Path urlDir, boolean overwrite,
Path tempCrawlDb = new Path(crawlDb,
"crawldb-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

// lock an existing crawldb to prevent multiple simultaneous updates
Path lock = CrawlDb.lock(conf, crawlDb, false);

// configure job
Job job = Job.getInstance(conf, "Nutch Injector: " + urlDir);
job.setJarByClass(Injector.class);
job.setMapperClass(InjectMapper.class);
job.setReducerClass(InjectReducer.class);
job.setOutputFormatClass(MapFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setSpeculativeExecution(false);
Job job = prepareJob(conf, urlDir, current, tempCrawlDb);

// set input and output paths of the job
MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
FileStatus[] seedFiles = urlDir.getFileSystem(getConf()).listStatus(urlDir);
int numSeedFiles = 0;
for (FileStatus seedFile : seedFiles) {
if (seedFile.isFile()) {
MultipleInputs.addInputPath(job, seedFile.getPath(),
KeyValueTextInputFormat.class);
numSeedFiles++;
LOG.info("Injecting seed URL file {}", seedFile.getPath());
} else {
LOG.warn("Skipped non-file input in {}: {}", urlDir,
seedFile.getPath());
}
}
if (numSeedFiles == 0) {
LOG.error("No seed files to inject found in {}", urlDir);
LockUtil.removeLockFile(fs, lock);
return;
}
FileOutputFormat.setOutputPath(job, tempCrawlDb);
// lock an existing crawldb to prevent multiple simultaneous updates
Path lock = CrawlDb.lock(conf, crawlDb, false);

try {
// run the job
Expand Down Expand Up @@ -487,6 +491,8 @@ public void inject(Path crawlDb, Path urlDir, boolean overwrite,
LOG.error("Injector job failed: {}", e.getMessage());
NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
throw e;
} finally {
LockUtil.removeLockFile(fs, lock);
}
}

Expand Down

0 comments on commit 3495472

Please sign in to comment.