Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,11 @@ linklint/
**/*.log
tmp
**/.flattened-pom.xml
.sw*
.*.sw*
ID
filenametags
tags
.codegenie/
.vscode/
**/__pycache__
9 changes: 9 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,12 @@ Layout/LineLength:

Metrics/MethodLength:
Max: 75

GlobalVars:
AllowedVariables:
- $CUST1_ENCODED
- $CUST1_ALIAS
- $CUST1_ENCODED
- $GLOB_CUST_ENCODED
- $TEST
- $TEST_CLUSTER
Original file line number Diff line number Diff line change
Expand Up @@ -2664,4 +2664,10 @@ List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, Server

@InterfaceAudience.Private
void restoreBackupSystemTable(String snapshotName) throws IOException;

/**
* Refresh the system key cache on all specified region servers.
* @param regionServers the list of region servers to refresh the system key cache on
*/
void refreshSystemKeyCacheOnServers(Set<ServerName> regionServers) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1146,4 +1146,9 @@ public List<String> getCachedFilesList(ServerName serverName) throws IOException
public void restoreBackupSystemTable(String snapshotName) throws IOException {
get(admin.restoreBackupSystemTable(snapshotName));
}

@Override
public void refreshSystemKeyCacheOnServers(Set<ServerName> regionServers) throws IOException {
get(admin.refreshSystemKeyCacheOnServers(regionServers));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1874,4 +1874,10 @@ CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, Str

@InterfaceAudience.Private
CompletableFuture<Void> restoreBackupSystemTable(String snapshotName);

/**
* Refresh the system key cache on all specified region servers.
* @param regionServers the list of region servers to refresh the system key cache on
*/
CompletableFuture<Void> refreshSystemKeyCacheOnServers(Set<ServerName> regionServers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,11 @@ public CompletableFuture<Void> updateConfiguration(String groupName) {
return wrap(rawAdmin.updateConfiguration(groupName));
}

@Override
public CompletableFuture<Void> refreshSystemKeyCacheOnServers(Set<ServerName> regionServers) {
return wrap(rawAdmin.refreshSystemKeyCacheOnServers(regionServers));
}

@Override
public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
return wrap(rawAdmin.rollWALWriter(serverName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public interface ColumnFamilyDescriptor {
/** Returns Return the raw crypto key attribute for the family, or null if not set */
byte[] getEncryptionKey();

/** Returns the encryption key namespace for this family */
String getEncryptionKeyNamespace();

/** Returns Return the encryption algorithm in use by this family */
String getEncryptionType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ public class ColumnFamilyDescriptorBuilder {
@InterfaceAudience.Private
public static final String ENCRYPTION_KEY = "ENCRYPTION_KEY";
private static final Bytes ENCRYPTION_KEY_BYTES = new Bytes(Bytes.toBytes(ENCRYPTION_KEY));
@InterfaceAudience.Private
public static final String ENCRYPTION_KEY_NAMESPACE = "ENCRYPTION_KEY_NAMESPACE";
private static final Bytes ENCRYPTION_KEY_NAMESPACE_BYTES =
new Bytes(Bytes.toBytes(ENCRYPTION_KEY_NAMESPACE));

private static final boolean DEFAULT_MOB = false;
@InterfaceAudience.Private
Expand Down Expand Up @@ -320,6 +324,7 @@ public static Map<String, String> getDefaultValues() {
DEFAULT_VALUES.keySet().forEach(s -> RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s))));
RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION)));
RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY)));
RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY_NAMESPACE)));
RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(IS_MOB)));
RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(MOB_THRESHOLD)));
RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(MOB_COMPACT_PARTITION_POLICY)));
Expand Down Expand Up @@ -522,6 +527,11 @@ public ColumnFamilyDescriptorBuilder setEncryptionKey(final byte[] value) {
return this;
}

public ColumnFamilyDescriptorBuilder setEncryptionKeyNamespace(final String value) {
desc.setEncryptionKeyNamespace(value);
return this;
}

public ColumnFamilyDescriptorBuilder setEncryptionType(String value) {
desc.setEncryptionType(value);
return this;
Expand Down Expand Up @@ -1337,6 +1347,20 @@ public ModifyableColumnFamilyDescriptor setEncryptionKey(byte[] keyBytes) {
return setValue(ENCRYPTION_KEY_BYTES, new Bytes(keyBytes));
}

@Override
public String getEncryptionKeyNamespace() {
return getStringOrDefault(ENCRYPTION_KEY_NAMESPACE_BYTES, Function.identity(), null);
}

/**
* Set the encryption key namespace attribute for the family
* @param keyNamespace the key namespace, or null to remove existing setting
* @return this (for chained invocation)
*/
public ModifyableColumnFamilyDescriptor setEncryptionKeyNamespace(String keyNamespace) {
return setValue(ENCRYPTION_KEY_NAMESPACE_BYTES, keyNamespace);
}

@Override
public long getMobThreshold() {
return getStringOrDefault(MOB_THRESHOLD_BYTES, Long::valueOf, DEFAULT_MOB_THRESHOLD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
Expand Down Expand Up @@ -4662,4 +4663,28 @@ MasterProtos.RestoreBackupSystemTableResponse> procedureCall(request,
MasterProtos.RestoreBackupSystemTableResponse::getProcId,
new RestoreBackupSystemTableProcedureBiConsumer());
}

@Override
public CompletableFuture<Void> refreshSystemKeyCacheOnServers(Set<ServerName> regionServers) {
CompletableFuture<Void> future = new CompletableFuture<>();
List<CompletableFuture<Void>> futures =
regionServers.stream().map(this::refreshSystemKeyCache).collect(Collectors.toList());
addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])),
(result, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else {
future.complete(result);
}
});
return future;
}

private CompletableFuture<Void> refreshSystemKeyCache(ServerName serverName) {
return this.<Void> newAdminCaller()
.action((controller, stub) -> this.<EmptyMsg, EmptyMsg, Void> adminCall(controller, stub,
EmptyMsg.getDefaultInstance(),
(s, c, req, done) -> s.refreshSystemKeyCache(controller, req, done), resp -> null))
.serverName(serverName).call();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.keymeta;

import java.io.IOException;
import java.security.KeyException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.crypto.ManagedKeyData;
import org.apache.hadoop.hbase.io.crypto.ManagedKeyState;
import org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos;
import org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeyRequest;
import org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeyResponse;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg;

@InterfaceAudience.Public
public class KeymetaAdminClient implements KeymetaAdmin {
private ManagedKeysProtos.ManagedKeysService.BlockingInterface stub;

public KeymetaAdminClient(Connection conn) throws IOException {
this.stub =
ManagedKeysProtos.ManagedKeysService.newBlockingStub(conn.getAdmin().coprocessorService());
}

@Override
public ManagedKeyData enableKeyManagement(byte[] keyCust, String keyNamespace)
throws IOException {
try {
ManagedKeysProtos.ManagedKeyResponse response =
stub.enableKeyManagement(null, ManagedKeyRequest.newBuilder()
.setKeyCust(ByteString.copyFrom(keyCust)).setKeyNamespace(keyNamespace).build());
return generateKeyData(response);
} catch (ServiceException e) {
throw ProtobufUtil.handleRemoteException(e);
}
}

@Override
public List<ManagedKeyData> getManagedKeys(byte[] keyCust, String keyNamespace)
throws IOException, KeyException {
try {
ManagedKeysProtos.GetManagedKeysResponse statusResponse =
stub.getManagedKeys(null, ManagedKeyRequest.newBuilder()
.setKeyCust(ByteString.copyFrom(keyCust)).setKeyNamespace(keyNamespace).build());
return generateKeyDataList(statusResponse);
} catch (ServiceException e) {
throw ProtobufUtil.handleRemoteException(e);
}
}

@Override
public boolean rotateSTK() throws IOException {
try {
ManagedKeysProtos.RotateSTKResponse response =
stub.rotateSTK(null, EmptyMsg.getDefaultInstance());
return response.getRotated();
} catch (ServiceException e) {
throw ProtobufUtil.handleRemoteException(e);
}
}

private static List<ManagedKeyData>
generateKeyDataList(ManagedKeysProtos.GetManagedKeysResponse stateResponse) {
List<ManagedKeyData> keyStates = new ArrayList<>();
for (ManagedKeyResponse state : stateResponse.getStateList()) {
keyStates.add(generateKeyData(state));
}
return keyStates;
}

private static ManagedKeyData generateKeyData(ManagedKeysProtos.ManagedKeyResponse response) {
return new ManagedKeyData(response.getKeyCust().toByteArray(), response.getKeyNamespace(), null,
ManagedKeyState.forValue((byte) response.getKeyState().getNumber()),
response.getKeyMetadata(), response.getRefreshTimestamp());
}
}
Loading