Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dagger-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ dependencies {
dependenciesJar 'io.vertx:vertx-pg-client:3.9.0'
dependenciesJar 'org.apache.commons:commons-pool2:2.4.3'
dependenciesJar 'org.apache.parquet:parquet-protobuf:1.12.2'
dependenciesJar 'com.aliyun.openservices:tablestore-hbase-client:2.0.12'

testImplementation project(':dagger-common').sourceSets.test.output
testImplementation 'junit:junit:4.13.1'
Expand Down Expand Up @@ -149,7 +150,7 @@ jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = 0.87
minimum = 0.85
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.gotocompany.dagger.core.processors.longbow.data;

import org.apache.hadoop.hbase.client.Result;
import com.gotocompany.dagger.core.processors.longbow.model.ScanResult;

import java.io.Serializable;
import java.util.List;
Expand All @@ -16,5 +16,5 @@ public interface LongbowData extends Serializable {
* @param scanResult the scan result
* @return the map
*/
Map parse(List<Result> scanResult);
Map parse(List<ScanResult> scanResult);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.gotocompany.dagger.core.processors.longbow.data;

import com.gotocompany.dagger.core.processors.longbow.model.ScanResult;
import com.gotocompany.dagger.core.utils.Constants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.ArrayList;
Expand All @@ -22,11 +22,14 @@ public LongbowProtoData() {
}

@Override
public Map<String, List<byte[]>> parse(List<Result> scanResult) {
public Map<String, List<byte[]>> parse(List<ScanResult> scanResult) {
ArrayList<byte[]> data = new ArrayList<>();

for (int i = 0; i < scanResult.size(); i++) {
data.add(i, scanResult.get(i).getValue(COLUMN_FAMILY_NAME, Bytes.toBytes(Constants.LONGBOW_QUALIFIER_DEFAULT)));
data.add(i, scanResult.get(i)
.getData()
.get(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT)
.get(Constants.LONGBOW_QUALIFIER_DEFAULT));
}

HashMap<String, List<byte[]>> longbowData = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.gotocompany.dagger.core.processors.longbow.data;

import com.gotocompany.dagger.core.processors.longbow.model.ScanResult;
import com.gotocompany.dagger.core.utils.Constants;
import com.gotocompany.dagger.core.processors.longbow.LongbowSchema;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.ArrayList;
Expand All @@ -16,8 +16,7 @@
*/
public class LongbowTableData implements LongbowData {

private static final byte[] COLUMN_FAMILY_NAME = Bytes.toBytes(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT);
private LongbowSchema longbowSchema;
private final LongbowSchema longbowSchema;

/**
* Instantiates a new Longbow table data.
Expand All @@ -29,7 +28,7 @@ public LongbowTableData(LongbowSchema longbowSchema) {
}

@Override
public Map<String, List<String>> parse(List<Result> scanResult) {
public Map<String, List<String>> parse(List<ScanResult> scanResult) {
Map<String, List<String>> longbowData = new HashMap<>();
List<String> longbowDataColumnNames = longbowSchema.getColumnNames(c -> c.getKey().contains(Constants.LONGBOW_DATA_KEY));
if (scanResult.isEmpty()) {
Expand All @@ -40,10 +39,10 @@ public Map<String, List<String>> parse(List<Result> scanResult) {
return longbowData;
}

private List<String> getData(List<Result> resultScan, String name) {
private List<String> getData(List<ScanResult> resultScan, String name) {
return resultScan
.stream()
.map(result -> Bytes.toString(result.getValue(COLUMN_FAMILY_NAME, Bytes.toBytes(name))))
.map(result -> Bytes.toString(result.getData().get(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT).get(name)))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.gotocompany.dagger.core.processors.longbow.enums;

public enum LongbowStorageType {
TABLESTORE,
BIGTABLE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.gotocompany.dagger.core.processors.longbow.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

import java.util.HashMap;
import java.util.Map;

@Data
@AllArgsConstructor
@Builder
public class ScanResult {
private byte[] primaryKey;
private Map<String, Map<String, byte[]>> data;

public ScanResult(byte[] primaryKey) {
this.primaryKey = primaryKey;
this.data = new HashMap<>();
}

public void addData(byte[] columnFamily, byte[] qualifier, byte[] value) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add unit test case for this method.

String columnFamilyString = new String(columnFamily);
if (!data.containsKey(columnFamilyString)) {
data.put(columnFamilyString, new HashMap<>());
}
data.get(columnFamilyString).put(new String(qualifier), value);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.gotocompany.dagger.core.processors.longbow.model.adapters;

import com.gotocompany.dagger.core.processors.longbow.model.ScanResult;
import org.apache.hadoop.hbase.client.Result;

import java.util.NavigableMap;

public class HBaseResultToScanResultAdapter implements ScanResultAdapter<Result> {

@Override
public ScanResult adapt(Result result) {
ScanResult scanResult = new ScanResult(result.getRow());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add unit test case for this

NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowMaps = result.getMap();
rowMaps.forEach(
(columnFamily, columnMap) -> columnMap.forEach((columnName, timestampMap) ->
scanResult.addData(columnFamily, columnName, timestampMap.firstEntry().getValue()))
);
return scanResult;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.gotocompany.dagger.core.processors.longbow.model.adapters;

import com.gotocompany.dagger.core.processors.longbow.model.ScanResult;

public interface ScanResultAdapter<T> {
ScanResult adapt(T result);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.gotocompany.dagger.core.processors.longbow.model.adapters;

import com.alicloud.openservices.tablestore.model.Row;
import com.gotocompany.dagger.core.processors.longbow.model.ScanResult;

public class TablestoreRowToScanResultAdapter implements ScanResultAdapter<Row> {

private static final int PRIMARY_COLUMN_INDEX = 0;

private final String columnFamilyName;

public TablestoreRowToScanResultAdapter(String columnFamilyName) {
this.columnFamilyName = columnFamilyName;
}

@Override
public ScanResult adapt(Row row) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add unit test for this

ScanResult scanResult = new ScanResult(row.getPrimaryKey().getPrimaryKeyColumn(PRIMARY_COLUMN_INDEX).getNameRawData());
row.getColumnsMap()
.forEach((columnName, timestampToValueMap) ->
scanResult.addData(columnFamilyName.getBytes(), columnName.getBytes(), timestampToValueMap.firstEntry().getValue().asBinary()));
return scanResult;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.gotocompany.dagger.core.metrics.telemetry.TelemetryPublisher;
import com.gotocompany.dagger.core.metrics.telemetry.TelemetryTypes;
import com.gotocompany.dagger.core.processors.longbow.exceptions.LongbowReaderException;
import com.gotocompany.dagger.core.processors.longbow.model.ScanResult;
import com.gotocompany.dagger.core.utils.Constants;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
Expand All @@ -20,7 +21,6 @@
import com.gotocompany.dagger.core.processors.longbow.request.ScanRequestFactory;
import com.gotocompany.dagger.core.processors.longbow.storage.LongbowStore;
import com.gotocompany.dagger.core.processors.longbow.storage.ScanRequest;
import org.apache.hadoop.hbase.client.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -144,16 +144,16 @@ public LongbowRange getLongbowRange() {
return longbowRange;
}

private void instrumentation(List<Result> scanResult, Instant startTime, Row input) {
private void instrumentation(List<ScanResult> scanResult, Instant startTime, Row input) {
meterStatsManager.markEvent(LongbowReaderAspects.SUCCESS_ON_READ_DOCUMENT);
meterStatsManager.updateHistogram(LongbowReaderAspects.SUCCESS_ON_READ_DOCUMENT_RESPONSE_TIME, between(startTime, Instant.now()).toMillis());
meterStatsManager.updateHistogram(LongbowReaderAspects.DOCUMENTS_READ_PER_SCAN, scanResult.size());
if (scanResult.isEmpty() || !Arrays.equals(scanResult.get(0).getRow(), longBowSchema.getKey(input, 0))) {
if (scanResult.isEmpty() || !Arrays.equals(scanResult.get(0).getPrimaryKey(), longBowSchema.getKey(input, 0))) {
meterStatsManager.markEvent(LongbowReaderAspects.FAILED_TO_READ_LAST_RECORD);
}
}

private List<Result> logException(Throwable ex, Instant startTime) {
private List<ScanResult> logException(Throwable ex, Instant startTime) {
LOGGER.error("LongbowReader : failed to scan document from BigTable: {}", ex.getMessage());
ex.printStackTrace();
meterStatsManager.markEvent(LongbowReaderAspects.FAILED_ON_READ_DOCUMENT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.gotocompany.dagger.core.processors.longbow.storage;

import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.gotocompany.dagger.common.configuration.Configuration;
import com.gotocompany.dagger.core.processors.longbow.model.ScanResult;
import com.gotocompany.dagger.core.processors.longbow.model.adapters.HBaseResultToScanResultAdapter;
import com.gotocompany.dagger.core.processors.longbow.model.adapters.ScanResultAdapter;
import com.gotocompany.dagger.core.utils.Constants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.BigtableAsyncConnection;
import org.apache.hadoop.hbase.client.Result;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.google.cloud.bigtable.admin.v2.models.GCRules.GCRULES;

public class BigTableLongbowOperationStrategy implements LongbowOperationStrategy {

private final BigtableTableAdminClient adminClient;
private final BigtableAsyncConnection tableClient;
private final Map<String, AsyncTable<AdvancedScanResultConsumer>> tables;
private final ScanResultAdapter<Result> scanResultAdapter;

public BigTableLongbowOperationStrategy(Configuration configuration) throws IOException {
String gcpProjectID = configuration.getString(Constants.PROCESSOR_LONGBOW_GCP_PROJECT_ID_KEY, Constants.PROCESSOR_LONGBOW_GCP_PROJECT_ID_DEFAULT);
String gcpInstanceID = configuration.getString(Constants.PROCESSOR_LONGBOW_GCP_INSTANCE_ID_KEY, Constants.PROCESSOR_LONGBOW_GCP_INSTANCE_ID_DEFAULT);
org.apache.hadoop.conf.Configuration bigTableConfiguration = BigtableConfiguration.configure(gcpProjectID, gcpInstanceID);
this.adminClient = BigtableTableAdminClient.create(gcpProjectID, gcpInstanceID);
this.tableClient = new BigtableAsyncConnection(bigTableConfiguration);
this.tables = new HashMap<>();
this.scanResultAdapter = new HBaseResultToScanResultAdapter();
}

@Override
public boolean tableExists(String tableId) {
return adminClient.exists(tableId);
}

@Override
public void createTable(Duration maxAgeDuration, String columnFamilyName, String tableId) {
adminClient.createTable(CreateTableRequest.of(tableId).addFamily(columnFamilyName,
GCRULES.union()
.rule(GCRULES.maxVersions(1))
.rule(GCRULES.maxAge(maxAgeDuration))));
}

@Override
public CompletableFuture<Void> put(PutRequest putRequest) {
return getTable(putRequest.getTableId()).put(putRequest.get());
}

@Override
public CompletableFuture<List<ScanResult>> scanAll(ScanRequest scanRequest) {
return getTable(scanRequest.getTableId())
.scanAll(scanRequest.get())
.thenApply(results -> results.stream()
.map(this.scanResultAdapter::adapt)
.collect(Collectors.toList()));
}

@Override
public void close() throws IOException {
if (tableClient != null) {
tableClient.close();
}
if (adminClient != null) {
adminClient.close();
}
}

private AsyncTable<AdvancedScanResultConsumer> getTable(String tableId) {
if (!tables.containsKey(tableId)) {
tables.put(tableId, tableClient.getTable(TableName.valueOf(tableId)));
}
return tables.get(tableId);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.gotocompany.dagger.core.processors.longbow.storage;

import com.gotocompany.dagger.core.processors.longbow.model.ScanResult;
import org.threeten.bp.Duration;

import java.io.IOException;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public interface LongbowOperationStrategy {

boolean tableExists(String tableId) throws ExecutionException, InterruptedException;
void createTable(Duration maxAgeDuration, String columnFamilyName, String tableId) throws ExecutionException, InterruptedException;
CompletableFuture<Void> put(PutRequest putRequest);
CompletableFuture<List<ScanResult>> scanAll(ScanRequest scanRequest);
void close() throws IOException;

}
Loading