Skip to content

Commit

Permalink
sudhirtumati implementation (#598)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudhirtumati authored Jan 31, 2024
1 parent 33143cd commit b91c95a
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 0 deletions.
19 changes: 19 additions & 0 deletions calculate_average_sudhirtumati.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/sh
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

JAVA_OPTS="--enable-preview -Xmx128m -XX:+UseSerialGC -XX:-TieredCompilation"
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_sudhirtumati
20 changes: 20 additions & 0 deletions prepare_sudhirtumati.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Uncomment below to use sdk
source "$HOME/.sdkman/bin/sdkman-init.sh"
sdk use java 21.0.2-open 1>&2
304 changes: 304 additions & 0 deletions src/main/java/dev/morling/onebrc/CalculateAverage_sudhirtumati.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

public class CalculateAverage_sudhirtumati {

private static final String FILE = "./measurements.txt";
private static final int bufferSize = 8192;
private static final byte SEMICOLON = (byte) ';';
private static final byte NEW_LINE = (byte) '\n';
private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
private static final Semaphore PERMITS = new Semaphore(THREAD_COUNT);
private static final MeasurementAggregator globalAggregator = new MeasurementAggregator();
private static final Semaphore AGGREGATOR_PERMITS = new Semaphore(1);
private static final Map<Integer, String> LOCATION_STORE = new ConcurrentHashMap<>();

public static void main(String[] args) throws IOException, InterruptedException {
CalculateAverage_sudhirtumati instance = new CalculateAverage_sudhirtumati();
instance.chunkProcess();
}

private void chunkProcess() throws IOException, InterruptedException {
try (FileInputStream is = new FileInputStream(FILE);
FileChannel fc = is.getChannel()) {
for (int i = 0; i < THREAD_COUNT; i++) {
PERMITS.acquire();
Thread t = new ChunkProcessingThread(i, fc);
t.setName(STR."T\{i}");
t.start();
}
do {
Thread.sleep(100);
} while (PERMITS.availablePermits() != THREAD_COUNT);
}
System.out.println(globalAggregator.getResult());
}

static class ChunkProcessingThread extends Thread {

private int index;
private final FileChannel fc;
private final MeasurementAggregator aggregator;

ChunkProcessingThread(int index, FileChannel fc) {
this.index = index;
this.fc = fc;
aggregator = new MeasurementAggregator();
}

@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(index == 0 ? bufferSize : bufferSize + 50);
long fcPosition = index == 0 ? 0 : (((long) index * bufferSize) - 50);
try {
while (fc.read(buffer, fcPosition) != -1) {
buffer.flip();
if (index != 0 /* && fc.position() != bufferSize */) {
seekStartPos(buffer);
}
processBuffer(buffer);
index += THREAD_COUNT;
fcPosition = ((long) index * bufferSize) - 50L;
if (buffer.capacity() == 8192) {
buffer = ByteBuffer.allocate(bufferSize + 50);
}
buffer.position(0);
}
AGGREGATOR_PERMITS.acquire();
globalAggregator.process(aggregator);
AGGREGATOR_PERMITS.release();
}
catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
PERMITS.release();
}

private void processBuffer(ByteBuffer buffer) throws IOException {
int mStartMark = buffer.position();
int tStartMark = -1;
int count = buffer.position();
do {
byte b = buffer.get(count);
if (b == SEMICOLON) {
tStartMark = count;
}
else if (b == NEW_LINE) {
byte[] locArr = new byte[tStartMark - mStartMark];
byte[] tempArr = new byte[count - tStartMark];
buffer.get(mStartMark, locArr);
buffer.get(mStartMark + locArr.length + 1, tempArr);
aggregator.process(locArr, tempArr);
mStartMark = count + 1;
}
count++;
} while (count < buffer.limit());
}

private void seekStartPos(ByteBuffer buffer) {
int i = buffer.limit() > 50 ? 49 : buffer.limit() - 2;
for (; i >= 0; i--) {
if (buffer.get(i) == NEW_LINE) {
buffer.position(i + 1);
break;
}
}
}
}

static final class MeasurementAggregator {
private static final long MAX_VALUE_DIVIDE_10 = Long.MAX_VALUE / 10;
private final Map<Integer, Measurement> store = new HashMap<>();

public void process(MeasurementAggregator other) {
other.store.forEach((k, v) -> {
Measurement m = store.get(k);
if (m == null) {
m = new Measurement();
store.put(k, m);
}
m.process(v);
});
}

public void process(byte[] location, byte[] temperature) throws IOException {
Integer hashCode = Arrays.hashCode(location);
LOCATION_STORE.computeIfAbsent(hashCode, _ -> new String(location));
// String loc = new String(location);
Measurement measurement = store.get(hashCode);
if (measurement == null) {
measurement = new Measurement();
store.put(hashCode, measurement);
}
double tempD = parseDouble(temperature);
measurement.process(tempD);
}

public double parseDouble(byte[] bytes) {
long value = 0;
int exp = 0;
boolean negative = false;
int decimalPlaces = Integer.MIN_VALUE;
int index = 0;
int ch = bytes[index];
if (ch == '-') {
negative = true;
ch = bytes[++index];
}
while (index < bytes.length) {
if (ch >= '0' && ch <= '9') {
while (value >= MAX_VALUE_DIVIDE_10) {
value >>>= 1;
exp++;
}
value = value * 10 + (ch - '0');
decimalPlaces++;

}
else if (ch == '.') {
decimalPlaces = 0;
}
if (index == bytes.length - 1) {
break;
}
else {
ch = bytes[++index];
}
}
return asDouble(value, exp, negative, decimalPlaces);
}

private static double asDouble(long value, int exp, boolean negative, int decimalPlaces) {
if (decimalPlaces > 0 && value < Long.MAX_VALUE / 2) {
if (value < Long.MAX_VALUE / (1L << 32)) {
exp -= 32;
value <<= 32;
}
if (value < Long.MAX_VALUE / (1L << 16)) {
exp -= 16;
value <<= 16;
}
if (value < Long.MAX_VALUE / (1L << 8)) {
exp -= 8;
value <<= 8;
}
if (value < Long.MAX_VALUE / (1L << 4)) {
exp -= 4;
value <<= 4;
}
if (value < Long.MAX_VALUE / (1L << 2)) {
exp -= 2;
value <<= 2;
}
if (value < Long.MAX_VALUE / (1L << 1)) {
exp -= 1;
value <<= 1;
}
}
for (; decimalPlaces > 0; decimalPlaces--) {
exp--;
long mod = value % 5;
value /= 5;
int modDiv = 1;
if (value < Long.MAX_VALUE / (1L << 4)) {
exp -= 4;
value <<= 4;
modDiv <<= 4;
}
if (value < Long.MAX_VALUE / (1L << 2)) {
exp -= 2;
value <<= 2;
modDiv <<= 2;
}
if (value < Long.MAX_VALUE / (1L << 1)) {
exp -= 1;
value <<= 1;
modDiv <<= 1;
}
if (decimalPlaces > 1)
value += modDiv * mod / 5;
else
value += (modDiv * mod + 4) / 5;
}
final double d = Math.scalb((double) value, exp);
return negative ? -d : d;
}

public String getResult() {
Map<String, Measurement> sortedMap = new TreeMap<>();
store.forEach((k, v) -> sortedMap.put(LOCATION_STORE.get(k), v));
return sortedMap.toString();
}
}

static final class Measurement {
private double min = Double.POSITIVE_INFINITY;
private double max = Double.NEGATIVE_INFINITY;
private double sum;
private long count;

public void process(double value) {
if (value < min) {
min = value;
}
if (value > max) {
max = value;
}
sum += value;
count++;
}

public void process(Measurement other) {
if (other.min < min) {
this.min = other.min;
}
if (other.max > max) {
this.max = other.max;
}
this.sum += other.sum;
this.count += other.count;
}

public String toString() {
ResultRow result = new ResultRow(min, sum, count, max);
return result.toString();
}
}

private record ResultRow(double min, double sum, double count, double max) {

public String toString() {
return STR."\{round(min)}/\{round((Math.round(sum * 10.0) / 10.0) / count)}/\{round(max)}";
}

private double round(double value) {
return Math.round(value * 10.0) / 10.0;
}
}

}

0 comments on commit b91c95a

Please sign in to comment.