Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to CalculateAverage_spullara #56

Merged
merged 1 commit into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions calculate_average_spullara.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@


JAVA_OPTS=""
sdk use java 21.0.1-graal
time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_spullara

322 changes: 151 additions & 171 deletions src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class CalculateAverage_spullara {
private static final String FILE = "./measurements.txt";
private static final String FILE = "./measurements.txt";

/*
* My results on this computer:
Expand All @@ -44,189 +41,172 @@ public class CalculateAverage_spullara {
*
*/

public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
var filename = args.length == 0 ? FILE : args[0];
var file = new File(filename);
long start = System.currentTimeMillis();

var totalLines = new AtomicInteger();
var results = getFileSegments(file).stream().map(segment -> {
var resultMap = new ByteArrayToResultMap();
long segmentEnd = segment.end();
try (var fileChannel = (FileChannel) Files.newByteChannel(Path.of(filename), StandardOpenOption.READ)) {
var bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segmentEnd - segment.start());
var buffer = new byte[64];
int lines = 0;
int startLine;
int limit = bb.limit();
while ((startLine = bb.position()) < limit) {
int currentPosition = startLine;
byte b;
int offset = 0;
while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != ';') {
buffer[offset++] = b;
}
int temp = 0;
int negative = 1;
outer:
while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != '\n') {
switch (b) {
case '-':
negative = -1;
case '.':
break;
case '\r':
currentPosition++;
break outer;
default:
temp = 10 * temp + (b - '0');
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
var filename = args.length == 0 ? FILE : args[0];
var file = new File(filename);

var resultsMap = getFileSegments(file).stream().map(segment -> {
var resultMap = new ByteArrayToResultMap();
long segmentEnd = segment.end();
try (var fileChannel = (FileChannel) Files.newByteChannel(Path.of(filename), StandardOpenOption.READ)) {
var bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segmentEnd - segment.start());
// Up to 100 characters for a city name
var buffer = new byte[100];
int startLine;
int limit = bb.limit();
while ((startLine = bb.position()) < limit) {
int currentPosition = startLine;
byte b;
int offset = 0;
int hash = 0;
while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != ';') {
buffer[offset++] = b;
hash = 31 * hash + b;
}
int temp;
int negative = 1;
// Inspired by @yemreinci to unroll this even further
if (bb.get(currentPosition) == '-') {
negative = -1;
currentPosition++;
}
if (bb.get(currentPosition + 1) == '.') {
temp = negative * ((bb.get(currentPosition) - '0') * 10 + (bb.get(currentPosition + 2) - '0'));
currentPosition += 3;
}
else {
temp = negative * ((bb.get(currentPosition) - '0') * 100 + ((bb.get(currentPosition + 1) - '0') * 10 + (bb.get(currentPosition + 3) - '0')));
currentPosition += 4;
}
if (bb.get(currentPosition) == '\r') {
currentPosition++;
}
currentPosition++;
resultMap.putOrMerge(buffer, 0, offset, temp / 10.0, hash);
bb.position(currentPosition);
}
return resultMap;
}
}
temp *= negative;
double finalTemp = temp / 10.0;
resultMap.putOrMerge(buffer, 0, offset,
() -> new Result(finalTemp),
measurement -> merge(measurement, finalTemp, finalTemp, finalTemp, 1));
lines++;
bb.position(currentPosition);
catch (IOException e) {
throw new RuntimeException(e);
}
}).parallel().flatMap(partition -> partition.getAll().stream())
.collect(Collectors.toMap(e -> new String(e.key()), Entry::value, CalculateAverage_spullara::merge, TreeMap::new));

System.out.println(resultsMap);
}

private static List<FileSegment> getFileSegments(File file) throws IOException {
int numberOfSegments = Runtime.getRuntime().availableProcessors();
long fileSize = file.length();
long segmentSize = fileSize / numberOfSegments;
List<FileSegment> segments = new ArrayList<>(numberOfSegments);
// Pointless to split small files
if (segmentSize < 1_000_000) {
segments.add(new FileSegment(0, fileSize));
return segments;
}
totalLines.addAndGet(lines);
return resultMap;
} catch (IOException e) {
throw new RuntimeException(e);
}
}).parallel().toList();

var resultMap = results.stream()
.flatMap(partition -> partition.getAll().stream())
.collect(Collectors.toMap(e -> new String(e.key()), Entry::value, CalculateAverage_spullara::merge, TreeMap::new));

System.out.println("Time: " + (System.currentTimeMillis() - start) + "ms");
System.out.println("Lines processed: " + totalLines);
System.out.println(resultMap);
}

private static List<FileSegment> getFileSegments(File file) throws IOException {
int numberOfSegments = Runtime.getRuntime().availableProcessors();
long fileSize = file.length();
long segmentSize = fileSize / numberOfSegments;
List<FileSegment> segments = new ArrayList<>();
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
for (int i = 0; i < numberOfSegments; i++) {
long segStart = i * segmentSize;
long segEnd = (i == numberOfSegments - 1) ? fileSize : segStart + segmentSize;
segStart = findSegment(i, 0, randomAccessFile, segStart, segEnd);
segEnd = findSegment(i, numberOfSegments - 1, randomAccessFile, segEnd, fileSize);

segments.add(new FileSegment(segStart, segEnd));
}
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
for (int i = 0; i < numberOfSegments; i++) {
long segStart = i * segmentSize;
long segEnd = (i == numberOfSegments - 1) ? fileSize : segStart + segmentSize;
segStart = findSegment(i, 0, randomAccessFile, segStart, segEnd);
segEnd = findSegment(i, numberOfSegments - 1, randomAccessFile, segEnd, fileSize);

segments.add(new FileSegment(segStart, segEnd));
}
}
return segments;
}
return segments;
}

private static Result merge(Result v, Result value) {
return merge(v, value.min, value.max, value.sum, value.count);
}

private static Result merge(Result v, double value, double value1, double value2, long value3) {
v.min = Math.min(v.min, value);
v.max = Math.max(v.max, value1);
v.sum += value2;
v.count += value3;
return v;
}

private static long findSegment(int i, int skipSegment, RandomAccessFile raf, long location, long fileSize) throws IOException {
if (i != skipSegment) {
raf.seek(location);
while (location < fileSize) {
location++;
if (raf.read() == '\n')
break;
}

private static Result merge(Result v, Result value) {
return merge(v, value.min, value.max, value.sum, value.count);
}

private static Result merge(Result v, double value, double value1, double value2, long value3) {
v.min = Math.min(v.min, value);
v.max = Math.max(v.max, value1);
v.sum += value2;
v.count += value3;
return v;
}

private static long findSegment(int i, int skipSegment, RandomAccessFile raf, long location, long fileSize) throws IOException {
if (i != skipSegment) {
raf.seek(location);
while (location < fileSize) {
location++;
if (raf.read() == '\n')
break;
}
}
return location;
}
return location;
}
}

class Result {
double min, max, sum;
long count;

Result(double value) {
min = max = sum = value;
this.count = 1;
}
double min, max, sum;
long count;

@Override
public String toString() {
return round(min) + "/" + round(sum / count) + "/" + round(max);
}
Result(double value) {
min = max = sum = value;
this.count = 1;
}

double round(double v) {
return Math.round(v * 10.0) / 10.0;
}
@Override
public String toString() {
return round(min) + "/" + round(sum / count) + "/" + round(max);
}

}
double round(double v) {
return Math.round(v * 10.0) / 10.0;
}

record Pair(int slot, Result slotValue) {
}

record Entry(byte[] key, Result value) {
}
record Entry(byte[] key, Result value) {
}

record FileSegment(long start, long end) {
}
record FileSegment(long start, long end) {
}

class ByteArrayToResultMap {
public static final int MAPSIZE = 1024*128;
Result[] slots = new Result[MAPSIZE];
byte[][] keys = new byte[MAPSIZE][];

private int hashCode(byte[] a, int fromIndex, int length) {
int result = 0;
int end = fromIndex + length;
for (int i = fromIndex; i < end; i++) {
result = 31 * result + a[i];
}
return result;
}

private Pair getPair(byte[] key, int offset, int size) {
int hash = hashCode(key, offset, size);
int slot = hash & (slots.length - 1);
var slotValue = slots[slot];
// Linear probe for open slot
while (slotValue != null && (keys[slot].length != size || !Arrays.equals(keys[slot], 0, size, key, offset, size))) {
slot = (slot + 1) & (slots.length - 1);
slotValue = slots[slot];
}
return new Pair(slot, slotValue);
}

public void putOrMerge(byte[] key, int offset, int size, Supplier<Result> supplier, Consumer<Result> merge) {
Pair result = getPair(key, offset, size);
Result value = result.slotValue();
if (value == null) {
int slot = result.slot();
slots[slot] = supplier.get();
byte[] bytes = new byte[size];
System.arraycopy(key, offset, bytes, 0, size);
keys[slot] = bytes;
} else {
merge.accept(value);
public static final int MAPSIZE = 1024 * 128;
Result[] slots = new Result[MAPSIZE];
byte[][] keys = new byte[MAPSIZE][];

public void putOrMerge(byte[] key, int offset, int size, double temp, int hash) {
int slot = hash & (slots.length - 1);
var slotValue = slots[slot];
// Linear probe for open slot
while (slotValue != null && (keys[slot].length != size || !Arrays.equals(keys[slot], 0, size, key, offset, size))) {
slot = (slot + 1) & (slots.length - 1);
slotValue = slots[slot];
}
Result value = slotValue;
if (value == null) {
slots[slot] = new Result(temp);
byte[] bytes = new byte[size];
System.arraycopy(key, offset, bytes, 0, size);
keys[slot] = bytes;
} else {
value.min = Math.min(value.min, temp);
value.max = Math.max(value.max, temp);
value.sum += temp;
value.count += 1;
}
}
}

// Get all pairs
public List<Entry> getAll() {
List<Entry> result = new ArrayList<>();
for (int i = 0; i < slots.length; i++) {
Result slotValue = slots[i];
if (slotValue != null) {
result.add(new Entry(keys[i], slotValue));
}

// Get all pairs
public List<Entry> getAll() {
List<Entry> result = new ArrayList<>(slots.length);
for (int i = 0; i < slots.length; i++) {
Result slotValue = slots[i];
if (slotValue != null) {
result.add(new Entry(keys[i], slotValue));
}
}
return result;
}
return result;
}
}
Loading