Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Clock;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand Down Expand Up @@ -55,7 +56,8 @@ public class BlockDataStreamOutputEntryPool implements KeyMetadataAware {
private final OzoneClientConfig config;
private int currentStreamIndex;
private final OzoneManagerProtocol omClient;
private final OmKeyArgs keyArgs;
private final OmKeyArgs.Builder keyArgs;
private final Map<String, String> metadata = new HashMap<>();
private final XceiverClientFactory xceiverClientFactory;
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
private final long openID;
Expand Down Expand Up @@ -83,7 +85,7 @@ public BlockDataStreamOutputEntryPool(
.setReplicationConfig(replicationConfig).setDataSize(info.getDataSize())
.setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
.setMultipartUploadPartNumber(partNumber)
.setSortDatanodesInPipeline(true).build();
.setSortDatanodesInPipeline(true);
this.openID = openID;
this.excludeList = createExcludeList();
this.bufferList = new ArrayList<>();
Expand Down Expand Up @@ -163,12 +165,12 @@ void hsyncKey(long offset) throws IOException {
throw new IOException("Hsync is unsupported for multipart keys.");
} else {
if (keyArgs.getLocationInfoList().isEmpty()) {
omClient.hsyncKey(keyArgs, openID);
omClient.hsyncKey(buildKeyArgs(), openID);
} else {
ContainerBlockID lastBLockId = keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1)
.getBlockID().getContainerBlockID();
if (!lastUpdatedBlockId.equals(lastBLockId)) {
omClient.hsyncKey(keyArgs, openID);
omClient.hsyncKey(buildKeyArgs(), openID);
lastUpdatedBlockId = lastBLockId;
}
}
Expand Down Expand Up @@ -235,7 +237,7 @@ private void allocateNewBlock() throws IOException {
LOG.debug("Allocating block with {}", excludeList);
}
OmKeyLocationInfo subKeyInfo =
omClient.allocateBlock(keyArgs, openID, excludeList);
omClient.allocateBlock(buildKeyArgs(), openID, excludeList);
addKeyLocationInfo(subKeyInfo);
}

Expand All @@ -251,9 +253,9 @@ void commitKey(long offset) throws IOException {
// partial key of a large file.
if (keyArgs.getIsMultipartKey()) {
commitUploadPartInfo =
omClient.commitMultipartUploadPart(keyArgs, openID);
omClient.commitMultipartUploadPart(buildKeyArgs(), openID);
} else {
omClient.commitKey(keyArgs, openID);
omClient.commitKey(buildKeyArgs(), openID);
}
} else {
LOG.warn("Closing KeyDataStreamOutput, but key args is null");
Expand Down Expand Up @@ -333,6 +335,11 @@ public long getDataSize() {

@Override
public Map<String, String> getMetadata() {
return this.keyArgs.getMetadata();
return metadata;
}

private OmKeyArgs buildKeyArgs() {
keyArgs.addAllMetadata(metadata);
return keyArgs.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Clock;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand Down Expand Up @@ -72,7 +73,8 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware {
*/
private int currentStreamIndex;
private final OzoneManagerProtocol omClient;
private final OmKeyArgs keyArgs;
private final OmKeyArgs.Builder keyArgs;
private final Map<String, String> metadata = new HashMap<>();
private final XceiverClientFactory xceiverClientFactory;
/**
* A {@link BufferPool} shared between all
Expand Down Expand Up @@ -101,8 +103,7 @@ public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
.setDataSize(info.getDataSize())
.setIsMultipartKey(b.isMultipartKey())
.setMultipartUploadID(b.getMultipartUploadID())
.setMultipartUploadPartNumber(b.getMultipartNumber())
.build();
.setMultipartUploadPartNumber(b.getMultipartNumber());
this.openID = b.getOpenHandler().getId();
this.excludeList = createExcludeList();

Expand Down Expand Up @@ -156,7 +157,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo, boolean f
return
new BlockOutputStreamEntry.Builder()
.setBlockID(subKeyInfo.getBlockID())
.setKey(keyArgs.getKeyName())
.setKey(getKeyName())
.setXceiverClientManager(xceiverClientFactory)
.setPipeline(subKeyInfo.getPipeline())
.setConfig(config)
Expand Down Expand Up @@ -300,7 +301,7 @@ private void allocateNewBlock(boolean forRetry) throws IOException {
LOG.debug("Allocating block with {}", excludeList);
}
OmKeyLocationInfo subKeyInfo =
omClient.allocateBlock(keyArgs, openID, excludeList);
omClient.allocateBlock(buildKeyArgs(), openID, excludeList);
addKeyLocationInfo(subKeyInfo, forRetry);
}

Expand All @@ -327,9 +328,9 @@ void commitKey(long offset) throws IOException {
// partial key of a large file.
if (keyArgs.getIsMultipartKey()) {
commitUploadPartInfo =
omClient.commitMultipartUploadPart(keyArgs, openID);
omClient.commitMultipartUploadPart(buildKeyArgs(), openID);
} else {
omClient.commitKey(keyArgs, openID);
omClient.commitKey(buildKeyArgs(), openID);
}
} else {
LOG.warn("Closing KeyOutputStream, but key args is null");
Expand All @@ -349,13 +350,13 @@ void hsyncKey(long offset) throws IOException {
} else {
if (keyArgs.getLocationInfoList().isEmpty()) {
MetricUtil.captureLatencyNs(clientMetrics::addOMHsyncLatency,
() -> omClient.hsyncKey(keyArgs, openID));
() -> omClient.hsyncKey(buildKeyArgs(), openID));
} else {
ContainerBlockID lastBLockId = keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1)
.getBlockID().getContainerBlockID();
if (!lastUpdatedBlockId.equals(lastBLockId)) {
MetricUtil.captureLatencyNs(clientMetrics::addOMHsyncLatency,
() -> omClient.hsyncKey(keyArgs, openID));
() -> omClient.hsyncKey(buildKeyArgs(), openID));
lastUpdatedBlockId = lastBLockId;
}
}
Expand Down Expand Up @@ -431,13 +432,15 @@ boolean isEmpty() {

@Override
public Map<String, String> getMetadata() {
if (keyArgs != null) {
return this.keyArgs.getMetadata();
}
return null;
return metadata;
}

long getDataSize() {
return keyArgs.getDataSize();
}

private OmKeyArgs buildKeyArgs() {
keyArgs.addAllMetadata(metadata);
return keyArgs.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -36,7 +36,7 @@
* Args for key. Client use this to specify key's attributes on key creation
* (putKey()).
*/
public final class OmKeyArgs implements Auditable {
public final class OmKeyArgs extends WithMetadata implements Auditable {
private final String volumeName;
private final String bucketName;
private final String keyName;
Expand All @@ -47,14 +47,13 @@ public final class OmKeyArgs implements Auditable {
private final boolean isMultipartKey;
private final String multipartUploadID;
private final int multipartUploadPartNumber;
private final Map<String, String> metadata;
private final boolean sortDatanodesInPipeline;
private final ImmutableList<OzoneAcl> acls;
private final boolean latestVersionLocation;
private final boolean recursive;
private final boolean headOp;
private final boolean forceUpdateContainerCacheFromSCM;
private final Map<String, String> tags;
private final ImmutableMap<String, String> tags;
// expectedDataGeneration, when used in key creation indicates that a
// key with the same keyName should exist with the given generation.
// For a key commit to succeed, the original key should still be present with the
Expand All @@ -64,6 +63,7 @@ public final class OmKeyArgs implements Auditable {
private Long expectedDataGeneration = null;

private OmKeyArgs(Builder b) {
super(b);
this.volumeName = b.volumeName;
this.bucketName = b.bucketName;
this.keyName = b.keyName;
Expand All @@ -73,15 +73,14 @@ private OmKeyArgs(Builder b) {
this.isMultipartKey = b.isMultipartKey;
this.multipartUploadID = b.multipartUploadID;
this.multipartUploadPartNumber = b.multipartUploadPartNumber;
this.metadata = b.metadata;
this.acls = b.acls.build();
this.sortDatanodesInPipeline = b.sortDatanodesInPipeline;
this.latestVersionLocation = b.latestVersionLocation;
this.recursive = b.recursive;
this.headOp = b.headOp;
this.forceUpdateContainerCacheFromSCM = b.forceUpdateContainerCacheFromSCM;
this.ownerName = b.ownerName;
this.tags = b.tags;
this.tags = b.tags.build();
this.expectedDataGeneration = b.expectedDataGeneration;
}

Expand Down Expand Up @@ -129,10 +128,6 @@ public void setDataSize(long size) {
dataSize = size;
}

public Map<String, String> getMetadata() {
return metadata;
}

public void setLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
this.locationInfoList = locationInfoList;
}
Expand Down Expand Up @@ -220,7 +215,7 @@ public KeyArgs toProtobuf() {
/**
* Builder class of OmKeyArgs.
*/
public static class Builder {
public static class Builder extends WithMetadata.Builder {
private String volumeName;
private String bucketName;
private String keyName;
Expand All @@ -231,14 +226,13 @@ public static class Builder {
private boolean isMultipartKey;
private String multipartUploadID;
private int multipartUploadPartNumber;
private final Map<String, String> metadata = new HashMap<>();
private boolean sortDatanodesInPipeline;
private boolean latestVersionLocation;
private final AclListBuilder acls;
private boolean recursive;
private boolean headOp;
private boolean forceUpdateContainerCacheFromSCM;
private final Map<String, String> tags = new HashMap<>();
private final MapBuilder<String, String> tags;
private Long expectedDataGeneration = null;

public Builder() {
Expand All @@ -247,9 +241,11 @@ public Builder() {

private Builder(AclListBuilder acls) {
this.acls = acls;
this.tags = MapBuilder.empty();
}

public Builder(OmKeyArgs obj) {
super(obj);
this.volumeName = obj.volumeName;
this.bucketName = obj.bucketName;
this.keyName = obj.keyName;
Expand All @@ -267,8 +263,7 @@ public Builder(OmKeyArgs obj) {
this.forceUpdateContainerCacheFromSCM =
obj.forceUpdateContainerCacheFromSCM;
this.expectedDataGeneration = obj.expectedDataGeneration;
this.metadata.putAll(obj.metadata);
this.tags.putAll(obj.tags);
this.tags = MapBuilder.of(obj.tags);
this.acls = AclListBuilder.of(obj.acls);
}

Expand All @@ -287,6 +282,10 @@ public Builder setKeyName(String key) {
return this;
}

public String getKeyName() {
return keyName;
}

public Builder setOwnerName(String owner) {
this.ownerName = owner;
return this;
Expand All @@ -297,6 +296,10 @@ public Builder setDataSize(long size) {
return this;
}

public long getDataSize() {
return dataSize;
}

public Builder setReplicationConfig(ReplicationConfig replConfig) {
this.replicationConfig = replConfig;
return this;
Expand All @@ -307,6 +310,10 @@ public Builder setLocationInfoList(List<OmKeyLocationInfo> locationInfos) {
return this;
}

public List<OmKeyLocationInfo> getLocationInfoList() {
return locationInfoList;
}

public Builder setAcls(List<OzoneAcl> listOfAcls) {
this.acls.addAll(listOfAcls);
return this;
Expand All @@ -317,6 +324,10 @@ public Builder setIsMultipartKey(boolean isMultipart) {
return this;
}

public boolean getIsMultipartKey() {
return isMultipartKey;
}

public Builder setMultipartUploadID(String uploadID) {
this.multipartUploadID = uploadID;
return this;
Expand All @@ -327,20 +338,22 @@ public Builder setMultipartUploadPartNumber(int multipartUploadPartNumber) {
return this;
}

@Override
public Builder addMetadata(String key, String value) {
this.metadata.put(key, value);
super.addMetadata(key, value);
return this;
}

@Override
public Builder addAllMetadata(Map<String, String> metadatamap) {
this.metadata.putAll(metadatamap);
super.addAllMetadata(metadatamap);
return this;
}

public Builder addAllMetadataGdpr(Map<String, String> metadatamap) {
addAllMetadata(metadatamap);
if (Boolean.parseBoolean(metadata.get(OzoneConsts.GDPR_FLAG))) {
GDPRSymmetricKey.newDefaultInstance().acceptKeyDetails(metadata::put);
if (Boolean.parseBoolean(metadatamap.get(OzoneConsts.GDPR_FLAG))) {
GDPRSymmetricKey.newDefaultInstance().acceptKeyDetails(this::addMetadata);
}
return this;
}
Expand Down