Skip to content

Commit

Permalink
Merge branch 'gzip-position-fix'
Browse files Browse the repository at this point in the history
  • Loading branch information
ato committed Nov 14, 2024
2 parents e6d9e6c + f3abd7a commit 9652eaa
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 28 deletions.
55 changes: 33 additions & 22 deletions src/org/netpreserve/jwarc/GunzipChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,44 @@ public int read(ByteBuffer dest) throws IOException {
seenHeader = true;
}

if (inflater.needsInput()) {
if (!readAtLeast(1)) {
throw new EOFException("unexpected end of gzip stream");
}
inflater.setInput(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
}
int totalRead = 0;

try {
int n = inflater.inflate(dest.array(), dest.arrayOffset() + dest.position(), dest.remaining());
if (crc != null) {
crc.update(dest.array(), dest.arrayOffset() + dest.position(), n);
}
dest.position(dest.position() + n);

int newBufferPosition = buffer.limit() - inflater.getRemaining();
inputPosition += newBufferPosition - buffer.position();
buffer.position(newBufferPosition);
do {
if (inflater.needsInput()) {
if (!readAtLeast(1)) {
throw new EOFException("unexpected end of gzip stream");
}
inflater.setInput(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
}

if (inflater.finished()) {
readTrailer();
inflater.reset();
int n = inflater.inflate(dest.array(), dest.arrayOffset() + dest.position(), dest.remaining());
if (crc != null) {
crc.reset();
crc.update(dest.array(), dest.arrayOffset() + dest.position(), n);
}
seenHeader = false;
}
return n;
dest.position(dest.position() + n);

int newBufferPosition = buffer.limit() - inflater.getRemaining();
inputPosition += newBufferPosition - buffer.position();
buffer.position(newBufferPosition);
totalRead += n;

if (inflater.finished()) {
readTrailer();
inflater.reset();
if (crc != null) {
crc.reset();
}
seenHeader = false;
// stop at the end of the gzip member even if there's remaining space in the destination buffer
// so that the caller can the offset of the next member
break;
}

// if we fill the dest buffer but the inflater still requests input then keep going to ensure we fully
// consume the gzip trailer at the end of a record before returning.
} while (dest.hasRemaining() || inflater.needsInput());
return totalRead;
} catch (DataFormatException e) {
throw new ZipException(e.getMessage());
}
Expand Down
32 changes: 26 additions & 6 deletions src/org/netpreserve/jwarc/tools/DedupeTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ public class DedupeTool {
public void deduplicateWarcFile(Path infile, Path outfile) throws IOException {
try (FileChannel input = FileChannel.open(infile);
WarcReader reader = new WarcReader(input);
FileChannel output = FileChannel.open(outfile, WRITE, CREATE, TRUNCATE_EXISTING);
WarcWriter writer = new WarcWriter(output, reader.compression())) {
FileChannel output = FileChannel.open(outfile, WRITE, CREATE, TRUNCATE_EXISTING)) {

// We create the WarcWriter on demand so that if no records are deduplicated we don't write an empty
// gzip member at the end of the file.
WarcWriter writer = null;

WarcRecord record = reader.next().orElse(null);
while (record != null) {
long position = reader.position();
Expand All @@ -37,16 +41,30 @@ record = reader.next().orElse(null);

if (revisit == null) {
if (verbose) System.out.println("Copying " + position + ":" + length);
long written = input.transferTo(position, length, output);
assert written == length;
transferExactly(input, position, length, output);
} else {
if (verbose) System.out.println("Writing revisit for " + position + ":" + length);
if (writer == null) writer = new WarcWriter(output, reader.compression());
writer.write(revisit);
}
}
}
}

private static void transferExactly(FileChannel input, long position, long length, FileChannel output) throws IOException {
long transferred = 0;
while (transferred < length) {
long n = input.transferTo(position + transferred, length - transferred, output);
if (n <= 0) {
throw new IOException("FileChannel.transferTo returned " + n);
}
transferred += n;
}
if (transferred != length) {
throw new IOException("Expected to transfer " + length + " but actually transferred " + transferred);
}
}

private WarcRevisit deduplicate(WarcRecord record) throws IOException {
if (!(record instanceof WarcResponse)) return null;
WarcResponse response = (WarcResponse) record;
Expand Down Expand Up @@ -83,13 +101,15 @@ public void setCdxServer(String cdxServer) {
private static Path determineOutputPath(Path infile) {
String[] suffixes = new String[]{".warc.gz", ".warc", ".arc.gz", ".arc"};
String filename = infile.getFileName().toString();
Path dir = infile.getParent();
if (dir == null) dir = Paths.get(".");
for (String suffix : suffixes) {
if (filename.endsWith(suffix)) {
String basename = filename.substring(0, filename.length() - suffix.length());
return infile.getParent().resolve(basename + "-dedup" + suffix);
return dir.resolve(basename + "-dedup" + suffix);
}
}
return infile.getParent().resolve(filename + ".dedup");
return dir.resolve(filename + ".dedup");
}

public static void main(String[] args) throws IOException {
Expand Down

0 comments on commit 9652eaa

Please sign in to comment.