Skip to content

Commit

Permalink
NUTCH-3079 Dumping a segment fails unless it has been fetched and parsed
Browse files Browse the repository at this point in the history
SegmentReaders dump and get tools do now check whether a segment
subdirectory exists before adding it as input. A warning is shown
if the subdirectory does not exist but is not excluded via one
of the general options (-nogenerate, etc.)
  • Loading branch information
sebastian-nagel committed Dec 4, 2024
1 parent b481f91 commit 86b893a
Showing 1 changed file with 69 additions and 27 deletions.
96 changes: 69 additions & 27 deletions src/java/org/apache/nutch/segment/SegmentReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class SegmentReader extends Configured implements Tool {

public static class InputCompatMapper extends
Mapper<WritableComparable<?>, Writable, Text, NutchWritable> {

private Text newKey = new Text();

@Override
Expand Down Expand Up @@ -195,6 +195,28 @@ public void reduce(Text key, Iterable<NutchWritable> values,
}
}

private static boolean segmSubdirExists(Configuration conf, Path segment,
String subDir) throws IOException {
Path segmSubPath = new Path(segment, subDir);
boolean exists = segmSubPath.getFileSystem(conf).exists(segmSubPath);
if (!exists) {
LOG.warn("Segment subdirectory {} does not exist in {}!", subDir,
segment);
}
return exists;
}

private static void addSegmSubDirIfExists(List<Path> inputDirs, Configuration conf,
Path segment, String subDir) throws IOException {
Path segmSubPath = new Path(segment, subDir);
if (segmSubPath.getFileSystem(conf).exists(segmSubPath)) {
inputDirs.add(segmSubPath);
} else {
LOG.warn("Segment subdirectory {} does not exist in {} - skipping!", subDir,
segment);
}
}

public void dump(Path segment, Path output) throws IOException,
InterruptedException, ClassNotFoundException {

Expand All @@ -203,21 +225,36 @@ public void dump(Path segment, Path output) throws IOException,
Job job = Job.getInstance(getConf(), "Nutch SegmentReader: " + segment);
Configuration conf = job.getConfiguration();

if (ge)
FileInputFormat.addInputPath(job, new Path(segment,
CrawlDatum.GENERATE_DIR_NAME));
if (fe)
FileInputFormat.addInputPath(job, new Path(segment,
CrawlDatum.FETCH_DIR_NAME));
if (pa)
FileInputFormat.addInputPath(job, new Path(segment,
CrawlDatum.PARSE_DIR_NAME));
if (co)
FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
if (pd)
FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
if (pt)
FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
List<Path> inputDirs = new ArrayList<>();
if (ge) {
addSegmSubDirIfExists(inputDirs, conf, segment,
CrawlDatum.GENERATE_DIR_NAME);
}
if (fe) {
addSegmSubDirIfExists(inputDirs, conf, segment,
CrawlDatum.FETCH_DIR_NAME);
}
if (pa) {
addSegmSubDirIfExists(inputDirs, conf, segment,
CrawlDatum.PARSE_DIR_NAME);
}
if (co) {
addSegmSubDirIfExists(inputDirs, conf, segment, Content.DIR_NAME);
}
if (pd) {
addSegmSubDirIfExists(inputDirs, conf, segment, ParseData.DIR_NAME);
}
if (pt) {
addSegmSubDirIfExists(inputDirs, conf, segment, ParseText.DIR_NAME);
}
if (inputDirs.isEmpty()) {
String msg = "No segment subdirectories defined as input";
LOG.error(msg);
throw new RuntimeException(msg);
}
for (Path p : inputDirs) {
FileInputFormat.addInputPath(job, p);
}

job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(InputCompatMapper.class);
Expand All @@ -243,7 +280,7 @@ public void dump(Path segment, Path output) throws IOException,
}
} catch (IOException | InterruptedException | ClassNotFoundException e ){
LOG.error(StringUtils.stringifyException(e));
throw e;
throw e;
}

// concatenate the output
Expand Down Expand Up @@ -307,7 +344,7 @@ public void get(final Path segment, final Text key, Writer writer,
final Map<String, List<Writable>> results) throws Exception {
LOG.info("SegmentReader: get '{}'", key);
ArrayList<Thread> threads = new ArrayList<>();
if (co)
if (co && segmSubdirExists(getConf(), segment, Content.DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
Expand All @@ -320,7 +357,7 @@ public void run() {
}
}
});
if (fe)
if (fe && segmSubdirExists(getConf(), segment, CrawlDatum.FETCH_DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
Expand All @@ -333,7 +370,8 @@ public void run() {
}
}
});
if (ge)
if (ge
&& segmSubdirExists(getConf(), segment, CrawlDatum.GENERATE_DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
Expand All @@ -346,7 +384,7 @@ public void run() {
}
}
});
if (pa)
if (pa && segmSubdirExists(getConf(), segment, CrawlDatum.PARSE_DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
Expand All @@ -359,7 +397,7 @@ public void run() {
}
}
});
if (pd)
if (pd && segmSubdirExists(getConf(), segment, ParseData.DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
Expand All @@ -372,7 +410,7 @@ public void run() {
}
}
});
if (pt)
if (pt && segmSubdirExists(getConf(), segment, ParseText.DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
Expand All @@ -386,6 +424,10 @@ public void run() {
}
});
Iterator<Thread> it = threads.iterator();
if (!it.hasNext()) {
LOG.error("No segment subdirectories specified as input!");
return;
}
while (it.hasNext())
it.next().start();
int cnt;
Expand Down Expand Up @@ -476,7 +518,7 @@ private List<Writable> getSeqRecords(Path dir, Text key) throws Exception {
* {@link Metadata#CONTENT_ENCODING} then fallback
* {@link java.nio.charset.StandardCharsets#UTF_8}
* @param parseMeta a populated {@link Metadata}
* @return {@link Charset}
* @return {@link Charset}
*/
public static Charset getCharset(Metadata parseMeta) {
Charset cs = StandardCharsets.UTF_8;
Expand Down Expand Up @@ -548,7 +590,7 @@ public void getStats(Path segment, final SegmentReaderStats stats)
Text key = new Text();
CrawlDatum val = new CrawlDatum();
FileSystem fs = segment.getFileSystem(getConf());

if (ge) {
SequenceFile.Reader[] readers = SegmentReaderUtil.getReaders(
new Path(segment, CrawlDatum.GENERATE_DIR_NAME), getConf());
Expand All @@ -559,7 +601,7 @@ public void getStats(Path segment, final SegmentReaderStats stats)
}
stats.generated = cnt;
}

if (fe) {
Path fetchDir = new Path(segment, CrawlDatum.FETCH_DIR_NAME);
if (fs.exists(fetchDir) && fs.getFileStatus(fetchDir).isDirectory()) {
Expand All @@ -584,7 +626,7 @@ public void getStats(Path segment, final SegmentReaderStats stats)
stats.fetched = cnt;
}
}

if (pd) {
Path parseDir = new Path(segment, ParseData.DIR_NAME);
if (fs.exists(parseDir) && fs.getFileStatus(parseDir).isDirectory()) {
Expand Down

0 comments on commit 86b893a

Please sign in to comment.