diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index e5ffc2211e7b..ff054c350956 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -22,6 +22,7 @@ import java.time.Clock; import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -55,7 +56,8 @@ public class BlockDataStreamOutputEntryPool implements KeyMetadataAware { private final OzoneClientConfig config; private int currentStreamIndex; private final OzoneManagerProtocol omClient; - private final OmKeyArgs keyArgs; + private final OmKeyArgs.Builder keyArgs; + private final Map metadata = new HashMap<>(); private final XceiverClientFactory xceiverClientFactory; private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private final long openID; @@ -83,7 +85,7 @@ public BlockDataStreamOutputEntryPool( .setReplicationConfig(replicationConfig).setDataSize(info.getDataSize()) .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID) .setMultipartUploadPartNumber(partNumber) - .setSortDatanodesInPipeline(true).build(); + .setSortDatanodesInPipeline(true); this.openID = openID; this.excludeList = createExcludeList(); this.bufferList = new ArrayList<>(); @@ -163,12 +165,12 @@ void hsyncKey(long offset) throws IOException { throw new IOException("Hsync is unsupported for multipart keys."); } else { if (keyArgs.getLocationInfoList().isEmpty()) { - omClient.hsyncKey(keyArgs, openID); + omClient.hsyncKey(buildKeyArgs(), openID); } else { ContainerBlockID lastBLockId = keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1) .getBlockID().getContainerBlockID(); if (!lastUpdatedBlockId.equals(lastBLockId)) { - omClient.hsyncKey(keyArgs, openID); + omClient.hsyncKey(buildKeyArgs(), openID); lastUpdatedBlockId = lastBLockId; } } @@ -235,7 +237,7 @@ private void allocateNewBlock() throws IOException { LOG.debug("Allocating block with {}", excludeList); } OmKeyLocationInfo subKeyInfo = - omClient.allocateBlock(keyArgs, openID, excludeList); + omClient.allocateBlock(buildKeyArgs(), openID, excludeList); addKeyLocationInfo(subKeyInfo); } @@ -251,9 +253,9 @@ void commitKey(long offset) throws IOException { // partial key of a large file. if (keyArgs.getIsMultipartKey()) { commitUploadPartInfo = - omClient.commitMultipartUploadPart(keyArgs, openID); + omClient.commitMultipartUploadPart(buildKeyArgs(), openID); } else { - omClient.commitKey(keyArgs, openID); + omClient.commitKey(buildKeyArgs(), openID); } } else { LOG.warn("Closing KeyDataStreamOutput, but key args is null"); @@ -333,6 +335,11 @@ public long getDataSize() { @Override public Map getMetadata() { - return this.keyArgs.getMetadata(); + return metadata; + } + + private OmKeyArgs buildKeyArgs() { + keyArgs.addAllMetadata(metadata); + return keyArgs.build(); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index d36e922c447b..f3b98626f093 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -23,6 +23,7 @@ import java.time.Clock; import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -72,7 +73,8 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { */ private int currentStreamIndex; private final OzoneManagerProtocol omClient; - private final OmKeyArgs keyArgs; + private final OmKeyArgs.Builder keyArgs; + private final Map metadata = new HashMap<>(); private final XceiverClientFactory xceiverClientFactory; /** * A {@link BufferPool} shared between all @@ -101,8 +103,7 @@ public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) { .setDataSize(info.getDataSize()) .setIsMultipartKey(b.isMultipartKey()) .setMultipartUploadID(b.getMultipartUploadID()) - .setMultipartUploadPartNumber(b.getMultipartNumber()) - .build(); + .setMultipartUploadPartNumber(b.getMultipartNumber()); this.openID = b.getOpenHandler().getId(); this.excludeList = createExcludeList(); @@ -156,7 +157,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo, boolean f return new BlockOutputStreamEntry.Builder() .setBlockID(subKeyInfo.getBlockID()) - .setKey(keyArgs.getKeyName()) + .setKey(getKeyName()) .setXceiverClientManager(xceiverClientFactory) .setPipeline(subKeyInfo.getPipeline()) .setConfig(config) @@ -300,7 +301,7 @@ private void allocateNewBlock(boolean forRetry) throws IOException { LOG.debug("Allocating block with {}", excludeList); } OmKeyLocationInfo subKeyInfo = - omClient.allocateBlock(keyArgs, openID, excludeList); + omClient.allocateBlock(buildKeyArgs(), openID, excludeList); addKeyLocationInfo(subKeyInfo, forRetry); } @@ -327,9 +328,9 @@ void commitKey(long offset) throws IOException { // partial key of a large file. if (keyArgs.getIsMultipartKey()) { commitUploadPartInfo = - omClient.commitMultipartUploadPart(keyArgs, openID); + omClient.commitMultipartUploadPart(buildKeyArgs(), openID); } else { - omClient.commitKey(keyArgs, openID); + omClient.commitKey(buildKeyArgs(), openID); } } else { LOG.warn("Closing KeyOutputStream, but key args is null"); @@ -349,13 +350,13 @@ void hsyncKey(long offset) throws IOException { } else { if (keyArgs.getLocationInfoList().isEmpty()) { MetricUtil.captureLatencyNs(clientMetrics::addOMHsyncLatency, - () -> omClient.hsyncKey(keyArgs, openID)); + () -> omClient.hsyncKey(buildKeyArgs(), openID)); } else { ContainerBlockID lastBLockId = keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1) .getBlockID().getContainerBlockID(); if (!lastUpdatedBlockId.equals(lastBLockId)) { MetricUtil.captureLatencyNs(clientMetrics::addOMHsyncLatency, - () -> omClient.hsyncKey(keyArgs, openID)); + () -> omClient.hsyncKey(buildKeyArgs(), openID)); lastUpdatedBlockId = lastBLockId; } } @@ -431,13 +432,15 @@ boolean isEmpty() { @Override public Map getMetadata() { - if (keyArgs != null) { - return this.keyArgs.getMetadata(); - } - return null; + return metadata; } long getDataSize() { return keyArgs.getDataSize(); } + + private OmKeyArgs buildKeyArgs() { + keyArgs.addAllMetadata(metadata); + return keyArgs.build(); + } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java index 395425b069e9..21dac858c41c 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java @@ -19,9 +19,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import jakarta.annotation.Nonnull; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -36,7 +36,7 @@ * Args for key. Client use this to specify key's attributes on key creation * (putKey()). */ -public final class OmKeyArgs implements Auditable { +public final class OmKeyArgs extends WithMetadata implements Auditable { private final String volumeName; private final String bucketName; private final String keyName; @@ -47,14 +47,13 @@ public final class OmKeyArgs implements Auditable { private final boolean isMultipartKey; private final String multipartUploadID; private final int multipartUploadPartNumber; - private final Map metadata; private final boolean sortDatanodesInPipeline; private final ImmutableList acls; private final boolean latestVersionLocation; private final boolean recursive; private final boolean headOp; private final boolean forceUpdateContainerCacheFromSCM; - private final Map tags; + private final ImmutableMap tags; // expectedDataGeneration, when used in key creation indicates that a // key with the same keyName should exist with the given generation. // For a key commit to succeed, the original key should still be present with the @@ -64,6 +63,7 @@ public final class OmKeyArgs implements Auditable { private Long expectedDataGeneration = null; private OmKeyArgs(Builder b) { + super(b); this.volumeName = b.volumeName; this.bucketName = b.bucketName; this.keyName = b.keyName; @@ -73,7 +73,6 @@ private OmKeyArgs(Builder b) { this.isMultipartKey = b.isMultipartKey; this.multipartUploadID = b.multipartUploadID; this.multipartUploadPartNumber = b.multipartUploadPartNumber; - this.metadata = b.metadata; this.acls = b.acls.build(); this.sortDatanodesInPipeline = b.sortDatanodesInPipeline; this.latestVersionLocation = b.latestVersionLocation; @@ -81,7 +80,7 @@ private OmKeyArgs(Builder b) { this.headOp = b.headOp; this.forceUpdateContainerCacheFromSCM = b.forceUpdateContainerCacheFromSCM; this.ownerName = b.ownerName; - this.tags = b.tags; + this.tags = b.tags.build(); this.expectedDataGeneration = b.expectedDataGeneration; } @@ -129,10 +128,6 @@ public void setDataSize(long size) { dataSize = size; } - public Map getMetadata() { - return metadata; - } - public void setLocationInfoList(List locationInfoList) { this.locationInfoList = locationInfoList; } @@ -220,7 +215,7 @@ public KeyArgs toProtobuf() { /** * Builder class of OmKeyArgs. */ - public static class Builder { + public static class Builder extends WithMetadata.Builder { private String volumeName; private String bucketName; private String keyName; @@ -231,14 +226,13 @@ public static class Builder { private boolean isMultipartKey; private String multipartUploadID; private int multipartUploadPartNumber; - private final Map metadata = new HashMap<>(); private boolean sortDatanodesInPipeline; private boolean latestVersionLocation; private final AclListBuilder acls; private boolean recursive; private boolean headOp; private boolean forceUpdateContainerCacheFromSCM; - private final Map tags = new HashMap<>(); + private final MapBuilder tags; private Long expectedDataGeneration = null; public Builder() { @@ -247,9 +241,11 @@ public Builder() { private Builder(AclListBuilder acls) { this.acls = acls; + this.tags = MapBuilder.empty(); } public Builder(OmKeyArgs obj) { + super(obj); this.volumeName = obj.volumeName; this.bucketName = obj.bucketName; this.keyName = obj.keyName; @@ -267,8 +263,7 @@ public Builder(OmKeyArgs obj) { this.forceUpdateContainerCacheFromSCM = obj.forceUpdateContainerCacheFromSCM; this.expectedDataGeneration = obj.expectedDataGeneration; - this.metadata.putAll(obj.metadata); - this.tags.putAll(obj.tags); + this.tags = MapBuilder.of(obj.tags); this.acls = AclListBuilder.of(obj.acls); } @@ -287,6 +282,10 @@ public Builder setKeyName(String key) { return this; } + public String getKeyName() { + return keyName; + } + public Builder setOwnerName(String owner) { this.ownerName = owner; return this; @@ -297,6 +296,10 @@ public Builder setDataSize(long size) { return this; } + public long getDataSize() { + return dataSize; + } + public Builder setReplicationConfig(ReplicationConfig replConfig) { this.replicationConfig = replConfig; return this; @@ -307,6 +310,10 @@ public Builder setLocationInfoList(List locationInfos) { return this; } + public List getLocationInfoList() { + return locationInfoList; + } + public Builder setAcls(List listOfAcls) { this.acls.addAll(listOfAcls); return this; @@ -317,6 +324,10 @@ public Builder setIsMultipartKey(boolean isMultipart) { return this; } + public boolean getIsMultipartKey() { + return isMultipartKey; + } + public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; return this; @@ -327,20 +338,22 @@ public Builder setMultipartUploadPartNumber(int multipartUploadPartNumber) { return this; } + @Override public Builder addMetadata(String key, String value) { - this.metadata.put(key, value); + super.addMetadata(key, value); return this; } + @Override public Builder addAllMetadata(Map metadatamap) { - this.metadata.putAll(metadatamap); + super.addAllMetadata(metadatamap); return this; } public Builder addAllMetadataGdpr(Map metadatamap) { addAllMetadata(metadatamap); - if (Boolean.parseBoolean(metadata.get(OzoneConsts.GDPR_FLAG))) { - GDPRSymmetricKey.newDefaultInstance().acceptKeyDetails(metadata::put); + if (Boolean.parseBoolean(metadatamap.get(OzoneConsts.GDPR_FLAG))) { + GDPRSymmetricKey.newDefaultInstance().acceptKeyDetails(this::addMetadata); } return this; }