Skip to content

Commit

Permalink
dpsoft: first submission (#572)
Browse files Browse the repository at this point in the history
* dpsoft: first submission

* minor clean up

* map with linear probing

* clean up

* update prepare

* clean up

* remove string format

* add credits

* fix format

* use prepare.sh

* graal 21.0.2

* fix differences

* clean up

* underflow protection

* improve segments generation logic

* clean up

* remove unnecessary alignment in findsegment

* new try

* fix number of segments
  • Loading branch information
dpsoft authored Feb 1, 2024
1 parent 2aed039 commit bec0cef
Show file tree
Hide file tree
Showing 3 changed files with 364 additions and 0 deletions.
20 changes: 20 additions & 0 deletions calculate_average_dpsoft.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/sh
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:-EnableJVMCI -XX:+UseEpsilonGC -Xms128m -Xmx128m -XX:+AlwaysPreTouch -XX:+UseTransparentHugePages -XX:-TieredCompilation -XX:+TrustFinalNonStaticFields"

java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_dpsoft
20 changes: 20 additions & 0 deletions prepare_dpsoft.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Uncomment below to use sdk
source "$HOME/.sdkman/bin/sdkman-init.sh"
sdk use java 21.0.2-graal 1>&2
324 changes: 324 additions & 0 deletions src/main/java/dev/morling/onebrc/CalculateAverage_dpsoft.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.Phaser;

public class CalculateAverage_dpsoft {
private static final String FILE = "./measurements.txt";
private static final int MAX_ROWS = 1 << 15;
private static final int ROWS_MASK = MAX_ROWS - 1;

public static void main(String[] args) throws IOException {
final var cpus = Runtime.getRuntime().availableProcessors();
final var segments = getMemorySegments(cpus);
final var tasks = new MeasurementExtractor[segments.size()];
final var phaser = new Phaser(segments.size());

for (int i = 0; i < segments.size(); i++) {
tasks[i] = new MeasurementExtractor(segments.get(i), phaser);
}

phaser.awaitAdvance(phaser.getPhase());

final var allMeasurements = Arrays.stream(tasks)
.parallel()
.map(MeasurementExtractor::getMeasurements)
.reduce(MeasurementMap::merge)
.orElseThrow();

System.out.println(sortSequentially(allMeasurements));

System.exit(0);
}

private static Map<String, Measurement> sortSequentially(MeasurementMap allMeasurements) {
final Map<String, Measurement> sorted = new TreeMap<>();
for (Measurement m : allMeasurements.measurements) {
if (m != null) {
sorted.put(new String(m.name, StandardCharsets.UTF_8), m);
}
}
return sorted;
}

// Inspired by @spullara
private static List<FileSegment> getMemorySegments(int numberOfSegments) throws IOException {
var file = new File(FILE);
long fileSize = file.length();
long segmentSize = fileSize / numberOfSegments;
List<FileSegment> segments = new ArrayList<>(numberOfSegments);

if (fileSize < 1_000_000) {
segments.add(new FileSegment(0, fileSize));
return segments;
}

while (segmentSize >= Integer.MAX_VALUE) {
numberOfSegments += 1;
segmentSize = fileSize / numberOfSegments;
}

try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
for (int i = 0; i < numberOfSegments; i++) {
long segStart = i * segmentSize;
long segEnd = (i == numberOfSegments - 1) ? fileSize : segStart + segmentSize;
segStart = findSegment(i, 0, randomAccessFile, segStart, segEnd);
segEnd = findSegment(i, numberOfSegments - 1, randomAccessFile, segEnd, fileSize);

segments.add(new FileSegment(segStart, segEnd));
}
}
return segments;
}

private static long findSegment(int i, int skipSegment, RandomAccessFile raf, long location, long fileSize) throws IOException {
if (i != skipSegment) {
raf.seek(location);
while (location < fileSize) {
location++;
if (raf.read() == '\n')
break;
}
}
return location;
}

record FileSegment(long start, long end) {
}

static final class MeasurementExtractor implements Runnable {
private final FileSegment segment;
private final Phaser phaser;
private final MeasurementMap measurements = new MeasurementMap();

MeasurementExtractor(FileSegment memorySegment, Phaser phaser) {
this.segment = memorySegment;
this.phaser = phaser;
(new Thread(this)).start();
}

@Override
public void run() {
long segmentEnd = segment.end();
try (var fileChannel = FileChannel.open(Path.of(FILE), StandardOpenOption.READ)) {
var mbb = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segmentEnd - segment.start());
mbb.order(ByteOrder.nativeOrder());

if (segment.start() > 0) {
skipToFirstLine(mbb);
}

while (mbb.remaining() > 0 && mbb.position() <= segmentEnd) {
int pos = mbb.position();
int nameHash = hashAndRewind(mbb);
var m = measurements.getOrCompute(nameHash, mbb, pos);
int temp = readTemperatureFromBuffer(mbb);

m.sample(temp);
}
}
catch (IOException e) {
throw new RuntimeException("Error reading file", e);
}
finally {
phaser.arriveAndAwaitAdvance();
}
}

// inspired by @lawrey
private static int hashAndRewind(MappedByteBuffer mbb) {
int hash = 0;
int idx = mbb.position();
outer: while (true) {
int name = mbb.getInt();
for (int c = 0; c < 4; c++) {
int b = (name >> (c << 3)) & 0xFF;
if (b == ';') {
idx += c + 1;
break outer;
}
hash ^= b * 82805;
}
idx += 4;
}

var rewind = mbb.position() - idx;
mbb.position(mbb.position() - rewind);
return hash;
}

private static int readTemperatureFromBuffer(MappedByteBuffer mbb) {
int temp = 0;
boolean negative = false;

outer: while (mbb.remaining() > 0) {
int b = mbb.get();
switch (b) {
case '-':
negative = true;
break;
default:
temp = 10 * temp + (b - '0');
break;
case '.':
b = mbb.get();
temp = 10 * temp + (b - '0');
case '\r':
mbb.get();
case '\n':
break outer;
}
}
if (negative)
temp = -temp;
return temp;
}

public MeasurementMap getMeasurements() {
return measurements;
}

// Skips to the first line in the buffer, used for chunk processing.
private static void skipToFirstLine(MappedByteBuffer mbb) {
while ((mbb.get() & 0xFF) >= ' ') {
// Skip bytes until reaching the start of a line.
}
}
}

// credits to @shipilev
static class MeasurementMap {
private final Measurement[] measurements = new Measurement[MAX_ROWS];

public Measurement getOrCompute(int hash, MappedByteBuffer mbb, int position) {
int index = hash & ROWS_MASK;
var measurement = measurements[index];
if (measurement != null && hash == measurement.nameHash && Measurement.equalsTo(measurement.name, mbb, position)) {
return measurement;
}
else {
return compute(hash, mbb, position);
}
}

private Measurement compute(int hash, MappedByteBuffer mbb, int position) {
var index = hash & ROWS_MASK;
Measurement m;

while (true) {
m = measurements[index];
if (m == null || (hash == m.nameHash && Measurement.equalsTo(m.name, mbb, position))) {
break;
}
index = (index + 1) & ROWS_MASK;
}

if (m == null) {
int len = mbb.position() - position - 1;
byte[] bytes = new byte[len];
mbb.position(position);
mbb.get(bytes, 0, len);
mbb.get();
measurements[index] = m = new Measurement(bytes, hash);
}

return m;
}

public MeasurementMap merge(MeasurementMap otherMap) {
for (Measurement other : otherMap.measurements) {
if (other == null)
continue;
int index = other.nameHash & ROWS_MASK;
while (true) {
Measurement m = measurements[index];
if (m == null) {
measurements[index] = other;
break;
}
else if (Arrays.equals(m.name, other.name)) {
m.merge(other);
break;
}
else {
index = (index + 1) & ROWS_MASK;
}
}
}
return this;
}
}

static final class Measurement {
public final int nameHash;
public final byte[] name;

public long sum;
public int count = 0;
public int min = Integer.MAX_VALUE;
public int max = Integer.MIN_VALUE;

public Measurement(byte[] name, int nameHash) {
this.name = name;
this.nameHash = nameHash;
}

public static boolean equalsTo(byte[] name, MappedByteBuffer mbb, int position) {
int len = mbb.position() - position - 1;
if (len != name.length)
return false;
for (int i = 0; i < len; i++) {
if (name[i] != mbb.get(position + i))
return false;
}
return true;
}

public void sample(int temp) {
min = Math.min(min, temp);
max = Math.max(max, temp);
sum += temp;
count++;
}

public Measurement merge(Measurement m2) {
min = Math.min(min, m2.min);
max = Math.max(max, m2.max);
sum += m2.sum;
count += m2.count;
return this;
}

public String toString() {
return round(((double) min) / 10.0) + "/" + round((((double) sum) / 10.0) / count) + "/" + round(((double) max) / 10.0);
}

private static double round(double value) {
return Math.round(value * 10.0) / 10.0;
}
}
}

0 comments on commit bec0cef

Please sign in to comment.