Skip to content

Commit

Permalink
optimizeCodeSmellInSpark (#4328)
Browse files Browse the repository at this point in the history
  • Loading branch information
xubo245 authored Oct 19, 2023
1 parent 4618808 commit 448564a
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inp
String enableBatch) {
this.queryModel = queryModel;
this.inputMetricsStats = inputMetricsStats;
if (enableBatch.equals("true")) {
if ("true".equals(enableBatch)) {
enableReturningBatches();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public CarbonStreamRecordReader(boolean isVectorReader, InputMetricsStats inputM
this.inputMetricsStats = inputMetricsStats;
}

@Override
protected void initializeAtFirstRow() throws IOException {
super.initializeAtFirstRow();
outputRow = new GenericInternalRow(outputValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ public ColumnVector getChild(int i) {
return vector.getChild(i);
}

@Override
public void reset() {
isLoaded = false;
pageLoad = null;
Expand All @@ -546,6 +547,7 @@ private void checkPageLoaded() {
}
}

@Override
public void setLazyPage(LazyPageLoader lazyPage) {
this.pageLoad = lazyPage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
public class CarbonBlockLoaderHelper {

private static final CarbonBlockLoaderHelper carbonBlockLoaderHelper =
private static final CarbonBlockLoaderHelper CARBON_BLOCK_LOADER_HELPER =
new CarbonBlockLoaderHelper();
/**
* maintains the map of segments already considered for the btree load
Expand All @@ -46,7 +46,7 @@ private CarbonBlockLoaderHelper() {
* @return
*/
public static CarbonBlockLoaderHelper getInstance() {
return carbonBlockLoaderHelper;
return CARBON_BLOCK_LOADER_HELPER;
}

private Set<String> getTableBlocks(AbsoluteTableIdentifier absoluteTableIdentifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,19 @@ public static List<String> getListOfValidSlices(LoadMetadataDetails[] details) {
List<String> activeSlices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (LoadMetadataDetails oneLoad : details) {
// External added segments are not loaded to SI
if (oneLoad.getPath() == null && SegmentStatus.SUCCESS.equals(oneLoad.getSegmentStatus())
|| SegmentStatus.LOAD_PARTIAL_SUCCESS.equals(oneLoad.getSegmentStatus())
|| SegmentStatus.MARKED_FOR_UPDATE.equals(oneLoad.getSegmentStatus())) {
if (isaBoolean(oneLoad)) {
activeSlices.add(oneLoad.getLoadName());
}
}
return activeSlices;
}

private static boolean isaBoolean(LoadMetadataDetails oneLoad) {
return oneLoad.getPath() == null && SegmentStatus.SUCCESS.equals(oneLoad.getSegmentStatus())
|| SegmentStatus.LOAD_PARTIAL_SUCCESS.equals(oneLoad.getSegmentStatus())
|| SegmentStatus.MARKED_FOR_UPDATE.equals(oneLoad.getSegmentStatus());
}

/**
* This method will return the mapping of valid segments to segment load start time
*/
Expand Down Expand Up @@ -340,17 +344,17 @@ public static boolean updateLoadMetadataWithMergeStatus(CarbonTable indexCarbonT
return tableStatusUpdateStatus;
}

public static boolean checkMainTableSegEqualToSISeg(
public static boolean checkMainTableSegEqualToSiSeg(
LoadMetadataDetails[] mainTableLoadMetadataDetails,
LoadMetadataDetails[] siTableLoadMetadataDetails) throws ErrorMessage {
return checkMainTableSegEqualToSISeg(mainTableLoadMetadataDetails, siTableLoadMetadataDetails,
return checkMainTableSegEqualToSiSeg(mainTableLoadMetadataDetails, siTableLoadMetadataDetails,
false);
}

/**
* Method to check if main table and SI have same number of valid segments or not
*/
public static boolean checkMainTableSegEqualToSISeg(
public static boolean checkMainTableSegEqualToSiSeg(
LoadMetadataDetails[] mainTableLoadMetadataDetails,
LoadMetadataDetails[] siTableLoadMetadataDetails, boolean isRegisterIndex)
throws ErrorMessage {
Expand Down Expand Up @@ -385,7 +389,7 @@ public static boolean checkMainTableSegEqualToSISeg(
/**
* Method to check if main table has in progress load and same segment not present in SI
*/
public static boolean checkInProgLoadInMainTableAndSI(CarbonTable carbonTable,
public static boolean checkInProgLoadInMainTableAndSi(CarbonTable carbonTable,
LoadMetadataDetails[] mainTableLoadMetadataDetails,
LoadMetadataDetails[] siTableLoadMetadataDetails) {
List<String> allSiSlices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public RowComparator(boolean[] noDictionaryColMapping, DataType[] noDicDataTypes
/**
* Below method will be used to compare two MDKeys
*/
@Override
public int compare(Object[] rowA, Object[] rowB) {
int diff = 0;
int index = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ public void init(IndexModel indexModel) {
public void validateSegmentList(String indexPath, String tableStatusVersion) {
LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
.readLoadMetadata(CarbonTablePath.getMetadataPath(indexPath), tableStatusVersion);
Set<String> validSISegments = new HashSet<>();
Set<String> validSiSegments = new HashSet<>();
for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.SUCCESS
|| loadMetadataDetail.getSegmentStatus() == SegmentStatus.MARKED_FOR_UPDATE
|| loadMetadataDetail.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
validSISegments.add(loadMetadataDetail.getLoadName());
validSiSegments.add(loadMetadataDetail.getLoadName());
}
}
validSegmentIds = Sets.intersection(validSISegments, validSegmentIds);
validSegmentIds = Sets.intersection(validSiSegments, validSegmentIds);
}

private Set<String> getPositionReferences(String databaseName, String indexName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ object CarbonIndexUtil {
val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath,
indexTable.getTableStatusVersion)
if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSiSeg(
mainTableDetails,
siTblLoadMetadataDetails)) {
val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
if (isRegisterIndex) {
// check if SI segments are more than main table segments
CarbonInternalLoaderUtil
.checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
.checkMainTableSegEqualToSiSeg(mainTblLoadMetadataDetails,
siTblLoadMetadataDetails, isRegisterIndex)
// check if SI table has undergone any Update or delete operation, which can happen in
// case of compatibility scenario. IUD after Refresh SI and before register index
Expand Down

0 comments on commit 448564a

Please sign in to comment.