Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt] refine some method name #1

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,11 @@ Status VDataStreamSender::init(const TDataSink& tsink) {
if (_hash_type == THashType::CRC32) {
_partitioner.reset(
new Crc32HashPartitioner<ShuffleChannelIds>(_channel_shared_ptrs.size()));
} else {
} else if (_hash_type == THashType::SPARK_MURMUR32) {
_partitioner.reset(new Murmur32HashPartitioner<ShufflePModChannelIds>(
_channel_shared_ptrs.size()));
} else {
return Status::InternalError("Invalid hash type for bucket shuffle: {}", _hash_type);
}
RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs));
} else if (_part_type == TPartitionType::RANGE_PARTITIONED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,11 @@ public class HiveExternalDistributionInfo extends HashDistributionInfo {
@SerializedName(value = "bucketingVersion")
private final int bucketingVersion;

public HiveExternalDistributionInfo() {
bucketingVersion = 2;
}

public HiveExternalDistributionInfo(int bucketNum, List<Column> distributionColumns, int bucketingVersion) {
super(bucketNum, distributionColumns);
this.bucketingVersion = bucketingVersion;
}

public HiveExternalDistributionInfo(int bucketNum, boolean autoBucket,
List<Column> distributionColumns, int bucketingVersion) {
super(bucketNum, autoBucket, distributionColumns);
this.bucketingVersion = bucketingVersion;
}

public int getBucketingVersion() {
return bucketingVersion;
}


@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,6 @@ public void init(BeSelectionPolicy policy) throws UserException {
} catch (ExecutionException e) {
throw new UserException("failed to get consistent hash", e);
}
/*consistentBucket = new ConsistentHash<>(Hashing.murmur3_128(), new BucketHash(),
new BackendHash(), backends, Config.virtual_node_number);*/
}

public Backend getNextBe() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,12 @@ public void createScanRangeLocations() throws UserException {
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys,
false, isACID) : fileSplit.getPartitionValues();

boolean isBucketedHiveTable = false;
boolean isSparkBucketedHiveTable = false;
int bucketNum = 0;
TableIf targetTable = getTargetTable();
if (targetTable instanceof HMSExternalTable) {
isBucketedHiveTable = ((HMSExternalTable) targetTable).isBucketedTable();
if (isBucketedHiveTable) {
isSparkBucketedHiveTable = ((HMSExternalTable) targetTable).isSparkBucketedTable();
if (isSparkBucketedHiveTable) {
bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt();
}
}
Expand Down Expand Up @@ -397,7 +397,7 @@ public void createScanRangeLocations() throws UserException {
fileSplit.getStart(), fileSplit.getLength(),
Joiner.on("|").join(fileSplit.getHosts()));
}
if (isBucketedHiveTable) {
if (isSparkBucketedHiveTable) {
bucketSeq2locations.put(bucketNum, curLocations);
}
scanRangeLocations.add(curLocations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HiveBucketUtil.HiveBucketType;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
Expand Down Expand Up @@ -167,7 +168,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
protected volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
protected List<Column> partitionColumns;
private List<Column> bucketColumns;
private boolean isSparkTable;
private HiveBucketType hiveBucketType = HiveBucketType.NONE;

private DLAType dlaType = DLAType.UNKNOWN;

Expand Down Expand Up @@ -256,12 +257,8 @@ public boolean isHoodieCowTable() {
return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName);
}

public boolean isSparkTable() {
return isSparkTable;
}

public boolean isBucketedTable() {
return bucketColumns != null && !bucketColumns.isEmpty() && isSparkTable;
public boolean isSparkBucketedTable() {
return bucketColumns != null && !bucketColumns.isEmpty() && hiveBucketType == HiveBucketType.SPARK;
}

/**
Expand Down Expand Up @@ -507,7 +504,7 @@ public List<Column> initSchema() {
private void initBucketingColumns(List<Column> columns) {
List<String> bucketCols = new ArrayList<>(5);
int numBuckets = getBucketColumns(bucketCols);
if (bucketCols.isEmpty() || !isSparkTable) {
if (bucketCols.isEmpty() || hiveBucketType != HiveBucketType.SPARK) {
bucketColumns = ImmutableList.of();
distributionInfo = new RandomDistributionInfo(1, true);
return;
Expand Down Expand Up @@ -544,6 +541,7 @@ private int getBucketColumns(List<String> bucketCols) {
/* Hive Bucketed Table */
bucketCols.addAll(descriptor.getBucketCols());
numBuckets = descriptor.getNumBuckets();
hiveBucketType = HiveBucketType.HIVE;
} else if (remoteTable.isSetParameters()
&& !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, remoteTable.getParameters().keySet())) {
Map<String, String> parameters = remoteTable.getParameters();
Expand All @@ -558,7 +556,7 @@ private int getBucketColumns(List<String> bucketCols) {
}

if (numBuckets > 0) {
isSparkTable = true;
hiveBucketType = HiveBucketType.SPARK;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
public class HiveBucketUtil {
private static final Logger LOG = LogManager.getLogger(HiveBucketUtil.class);

public enum HiveBucketType {
NONE,
HIVE,
SPARK
}

private static final Set<PrimitiveType> SUPPORTED_TYPES_FOR_BUCKET_FILTER = ImmutableSet.of(
PrimitiveType.BOOLEAN,
PrimitiveType.TINYINT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User

@Override
public DataPartition constructInputPartitionByDistributionInfo() {
if (hmsTable.isBucketedTable()) {
if (hmsTable.isSparkBucketedTable()) {
DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo();
if (!(distributionInfo instanceof HashDistributionInfo)) {
return DataPartition.RANDOM;
Expand All @@ -448,7 +448,7 @@ public HMSExternalTable getHiveTable() {

@Override
public THashType getHashType() {
if (hmsTable.isBucketedTable()
if (hmsTable.isSparkBucketedTable()
&& hmsTable.getDefaultDistributionInfo() instanceof HashDistributionInfo) {
return THashType.SPARK_MURMUR32;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2458,17 +2458,7 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec,
THashType hashType = THashType.XXHASH64;
switch (distributionSpecHash.getShuffleType()) {
case STORAGE_BUCKETED:
switch (distributionSpecHash.getShuffleFunction()) {
case STORAGE_BUCKET_SPARK_MURMUR32:
hashType = THashType.SPARK_MURMUR32;
break;
case STORAGE_BUCKET_CRC32:
hashType = THashType.CRC32;
break;
case STORAGE_BUCKET_XXHASH64:
default:
break;
}
hashType = distributionSpecHash.getShuffleFunction().toThrift();
partitionType = TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
break;
case EXECUTION_BUCKETED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.nereids.annotation.Developing;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.thrift.THashType;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -338,9 +339,24 @@ public enum ShuffleType {
* Enums for concrete shuffle functions.
*/
public enum StorageBucketHashType {
// CRC32 is for Doris internal storage bucket hash function
STORAGE_BUCKET_CRC32,
// XXHASH64 is the default hash function for Doris computation layer
STORAGE_BUCKET_XXHASH64,
STORAGE_BUCKET_SPARK_MURMUR32
// SPARK_MURMUR32 is the hash function for Spark bucketed hive table's storage and computation
STORAGE_BUCKET_SPARK_MURMUR32;

public THashType toThrift() {
switch (this) {
case STORAGE_BUCKET_CRC32:
return THashType.CRC32;
case STORAGE_BUCKET_SPARK_MURMUR32:
return THashType.SPARK_MURMUR32;
case STORAGE_BUCKET_XXHASH64:
default:
return THashType.XXHASH64;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private DistributionSpec convertDistribution(LogicalFileScan fileScan) {
}
}
StorageBucketHashType function = StorageBucketHashType.STORAGE_BUCKET_CRC32;
if (hmsExternalTable.isBucketedTable()) {
if (hmsExternalTable.isSparkBucketedTable()) {
function = StorageBucketHashType.STORAGE_BUCKET_SPARK_MURMUR32;
}
return new DistributionSpecHash(hashColumns, DistributionSpecHash.ShuffleType.NATURAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,29 +625,35 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr
}

PlanNode leftRoot = leftChildFragment.getPlanRoot();
// 1.leftRoot be OlapScanNode
if (leftRoot instanceof OlapScanNode) {
return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs);
} else if (leftRoot instanceof HiveScanNode) {
return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType);
if (leftRoot instanceof ScanNode) {
return canBucketShuffleJoin(node, (ScanNode) leftRoot, rhsHashExprs, hashType);
}

// 2.leftRoot be hashjoin node
if (leftRoot instanceof HashJoinNode) {
while (leftRoot instanceof HashJoinNode) {
leftRoot = leftRoot.getChild(0);
}
if (leftRoot instanceof OlapScanNode) {
return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs);
} else if (leftRoot instanceof HiveScanNode) {
return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType);
if (leftRoot instanceof ScanNode) {
canBucketShuffleJoin(node, (ScanNode) leftRoot, rhsHashExprs, hashType);
}
}

return false;
}

private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNode,
private boolean canBucketShuffleJoin(HashJoinNode node, ScanNode leftScanNode,
List<Expr> rhsJoinExprs, Ref<THashType> hashType) {
if (leftScanNode instanceof OlapScanNode) {
return canBucketShuffleJoinForOlap(node, (OlapScanNode) leftScanNode, rhsJoinExprs);
} else if (leftScanNode instanceof HiveScanNode) {
return canBucketShuffleJoinForHive(node, (HiveScanNode) leftScanNode, rhsJoinExprs, hashType);
} else {
return false;
}
}

private boolean canBucketShuffleJoinForHive(HashJoinNode node, HiveScanNode leftScanNode,
List<Expr> rhsJoinExprs, Ref<THashType> hashType) {
HMSExternalTable leftTable = leftScanNode.getHiveTable();

Expand Down Expand Up @@ -713,7 +719,7 @@ private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNod
}

//the join expr must contian left table distribute column
private boolean canBucketShuffleJoin(HashJoinNode node, OlapScanNode leftScanNode,
private boolean canBucketShuffleJoinForOlap(HashJoinNode node, OlapScanNode leftScanNode,
List<Expr> rhsJoinExprs) {
OlapTable leftTable = leftScanNode.getOlapTable();

Expand Down
28 changes: 18 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2291,13 +2291,8 @@ private void computeScanRangeAssignment() throws Exception {
computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost);
}
if (fragmentContainsBucketShuffleJoin) {
if (scanNode instanceof OlapScanNode) {
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode,
idToBackend, addressToBackendID, replicaNumPerHost);
} else if (scanNode instanceof HiveScanNode) {
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((HiveScanNode) scanNode,
idToBackend, addressToBackendID, replicaNumPerHost);
}
bucketShuffleJoinController.computeScanRangeAssignmentByBucket(scanNode,
idToBackend, addressToBackendID, replicaNumPerHost);
}
if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) {
computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost,
Expand Down Expand Up @@ -2922,8 +2917,21 @@ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLoc
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
}

// to ensure the same bucketSeq tablet to the same execHostPort
private void computeScanRangeAssignmentByBucket(
final ScanNode scanNode, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
if (scanNode instanceof OlapScanNode) {
computeScanRangeAssignmentByBucketForOlap((OlapScanNode) scanNode, idToBackend, addressToBackendID,
replicaNumPerHost);
} else if (scanNode instanceof HiveScanNode) {
computeScanRangeAssignmentByBucketForHive((HiveScanNode) scanNode, idToBackend, addressToBackendID,
replicaNumPerHost);
}
}

// to ensure the same bucketSeq tablet to the same execHostPort
private void computeScanRangeAssignmentByBucketForOlap(
final OlapScanNode scanNode, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
Expand Down Expand Up @@ -2974,13 +2982,13 @@ private void computeScanRangeAssignmentByBucket(
}
}

private void computeScanRangeAssignmentByBucket(
private void computeScanRangeAssignmentByBucketForHive(
final HiveScanNode scanNode, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
int bucketNum = 0;
if (scanNode.getHiveTable().isBucketedTable()) {
if (scanNode.getHiveTable().isSparkBucketedTable()) {
bucketNum = scanNode.getHiveTable().getDefaultDistributionInfo().getBucketNum();
} else {
throw new NotImplementedException("bucket shuffle for non-bucketed table not supported");
Expand Down
Loading