diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchCacheFixed.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchCacheFixed.java index 41804227..f69b75bd 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchCacheFixed.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchCacheFixed.java @@ -5,6 +5,10 @@ import java.util.Map.Entry; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.aksw.commons.util.lock.LockUtils; class BinSearchCacheFixed implements BinSearchCache @@ -13,6 +17,9 @@ class BinSearchCacheFixed protected NavigableMap fixedDispositions; protected Map fixedHeaders; + protected ReadWriteLock dispositionLock = new ReentrantReadWriteLock(); + protected ReadWriteLock headerLock = new ReentrantReadWriteLock(); + public BinSearchCacheFixed() { super(); this.fixedDispositions = new TreeMap<>(); @@ -21,64 +28,72 @@ public BinSearchCacheFixed() { @Override public long getDisposition(long position) { - long result = -1; - if (fixedHeaders.containsKey(position)) { - result = position; - } else { - Entry e = fixedDispositions.floorEntry(position); - if (e != null) { - // long from = e.getKey(); - long to = e.getValue(); - if (position <= to) { - result = to; + return LockUtils.runWithLock(dispositionLock.readLock(), () -> { + long result = -1; + if (fixedHeaders.containsKey(position)) { + result = position; + } else { + Entry e = fixedDispositions.floorEntry(position); + if (e != null) { + // long from = e.getKey(); + long to = e.getValue(); + if (position <= to) { + result = to; + } } } - } - return result; + return result; + }); } @Override public void setDisposition(long from, long to) { - Entry e = fixedDispositions.floorEntry(from); - if (e != null) { - long cachedFrom = e.getKey(); - long cachedTo = e.getValue(); + LockUtils.runWithLock(dispositionLock.writeLock(), () -> { + Entry e = fixedDispositions.floorEntry(from); + if (e != null) { + long cachedFrom = e.getKey(); + long cachedTo = e.getValue(); - // issues to check for: - // TODO cached range overlaps with starting point (cachedTo > from) - if (cachedTo > to) { - // new: [ ] - // cached: [ ] - throw new IllegalStateException(String.format("The upper endoint overlaps with an existing entry: [%d, %d] -> [%d, %d]", from, to, cachedFrom, cachedTo)); - } else if (cachedTo == to) { - // Update an existing entry with a lower boundary - if (from < cachedFrom) { - fixedDispositions.remove(cachedFrom); + // issues to check for: + // TODO cached range overlaps with starting point (cachedTo > from) + if (cachedTo > to) { + // new: [ ] + // cached: [ ] + throw new IllegalStateException(String.format("The upper endoint overlaps with an existing entry: [%d, %d] -> [%d, %d]", from, to, cachedFrom, cachedTo)); + } else if (cachedTo == to) { + // Update an existing entry with a lower boundary + if (from < cachedFrom) { + fixedDispositions.remove(cachedFrom); + fixedDispositions.put(from, to); + } + } else { // to < cachedTo + // Sanity check: New's lower endpoint must not overlap + // new: [ ] + // existing: [ ] + if (from <= cachedTo) { + throw new IllegalStateException(String.format("Overlap with an existing entry: [%d, %d] -> [%d, %d]", from, to, cachedFrom, cachedTo)); + } fixedDispositions.put(from, to); } - } else { // to < cachedTo - // Sanity check: New's lower endpoint must not overlap - // new: [ ] - // existing: [ ] - if (from <= cachedTo) { - throw new IllegalStateException(String.format("Overlap with an existing entry: [%d, %d] -> [%d, %d]", from, to, cachedFrom, cachedTo)); - } + } else { fixedDispositions.put(from, to); } - } else { - fixedDispositions.put(from, to); - } + }); } @Override public HeaderRecord getHeader(long position) { - HeaderRecord result = fixedHeaders.get(position); - return result; + return LockUtils.runWithLock(headerLock.readLock(), () -> { + HeaderRecord result = fixedHeaders.get(position); + return result; + }); } @Override public void setHeader(HeaderRecord headerRecord) { - // HeaderRecord headerRecord = new HeaderRecord(position, disposition, header, isDataConsumed); - fixedHeaders.put(headerRecord.position(), headerRecord); + LockUtils.runWithLock(headerLock.writeLock(), () -> { + // HeaderRecord headerRecord = new HeaderRecord(position, disposition, header, isDataConsumed); + fixedHeaders.put(headerRecord.position(), headerRecord); + }); } } diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchUtils.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchUtils.java index 4c9068ae..d8d60557 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchUtils.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchUtils.java @@ -73,7 +73,7 @@ public static InputStream configureStream( new ReadableByteChannelForLinesMatchingPrefix( SeekableInputStreams.wrap(in), scanState)); } else { - System.err.println("NO MATCH for " + new String(prefix)); + // System.err.println("NO MATCH for " + new String(prefix)); in.close(); result = InputStream.nullInputStream(); // ReadableChannels.newInputStream(ReadableChannels.limit(in, 0)); }