Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(license): invalidate license when cpu exceeds limit instead of rejecting new compute nodes from joining #20276

Merged
merged 10 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ extend-exclude = [
"src/sqlparser/tests/testdata/",
"src/frontend/planner_test/tests/testdata",
"src/tests/sqlsmith/tests/freeze",
"src/license/src/manager.rs",
"src/license/**/*.rs", # JWT license key
"Cargo.lock",
"**/Cargo.toml",
"**/go.mod",
Expand Down
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ message MetaSnapshot {
reserved "parallel_unit_mappings";
GetSessionParamsResponse session_params = 20;
repeated catalog.Secret secrets = 23;
uint64 compute_node_total_cpu_count = 24;
repeated common.WorkerNode nodes = 10;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
Expand Down Expand Up @@ -540,6 +541,7 @@ message SubscribeResponse {
FragmentWorkerSlotMapping streaming_worker_slot_mapping = 27;
FragmentWorkerSlotMappings serving_worker_slot_mappings = 28;
catalog.Secret secret = 29;
uint64 compute_node_total_cpu_count = 30;
}
reserved 12;
reserved "parallel_unit_mapping";
Expand Down
1 change: 1 addition & 0 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ where
Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(),
Info::HummockStats(_) => true,
Info::Recovery(_) => true,
Info::ComputeNodeTotalCpuCount(_) => true,
Info::StreamingWorkerSlotMapping(_) => {
notification.version
> info
Expand Down
5 changes: 5 additions & 0 deletions src/compute/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::license::LicenseManager;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::ObserverState;
Expand Down Expand Up @@ -45,6 +46,9 @@ impl ObserverState for ComputeObserverNode {
panic!("error type notification");
}
},
Info::ComputeNodeTotalCpuCount(count) => {
LicenseManager::get().update_cpu_core_count(count as _);
}
_ => {
panic!("error type notification");
}
Expand All @@ -57,6 +61,7 @@ impl ObserverState for ComputeObserverNode {
unreachable!();
};
LocalSecretManager::global().init_secrets(snapshot.secrets);
LicenseManager::get().update_cpu_core_count(snapshot.compute_node_total_cpu_count as _);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use parking_lot::RwLock;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
use risingwave_common::catalog::CatalogVersion;
use risingwave_common::hash::WorkerSlotMapping;
use risingwave_common::license::LicenseManager;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
Expand Down Expand Up @@ -114,6 +115,9 @@ impl ObserverState for FrontendObserverNode {
Info::Recovery(_) => {
self.compute_client_pool.invalidate_all();
}
Info::ComputeNodeTotalCpuCount(count) => {
LicenseManager::get().update_cpu_core_count(count as _);
}
}
}

Expand Down Expand Up @@ -147,6 +151,7 @@ impl ObserverState for FrontendObserverNode {
session_params,
version,
secrets,
compute_node_total_cpu_count,
} = snapshot;

for db in databases {
Expand Down Expand Up @@ -208,6 +213,7 @@ impl ObserverState for FrontendObserverNode {
*self.session_params.write() =
serde_json::from_str(&session_params.unwrap().params).unwrap();
LocalSecretManager::global().init_secrets(secrets);
LicenseManager::get().update_cpu_core_count(compute_node_total_cpu_count as _);
}
}

Expand Down
66 changes: 17 additions & 49 deletions src/license/src/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,56 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroU64;

use thiserror::Error;

use crate::{LicenseKeyError, LicenseManager};

/// The error type for CPU core limit exceeded as per the license key.
#[derive(Debug, Clone, Error)]
#[error("invalid license key")]
pub enum CpuCoreLimitExceeded {
#[error("cannot check CPU core limit due to license key error")]
LicenseKeyError(#[from] LicenseKeyError),

#[error(
"CPU core limit exceeded as per the license key, \
requesting {actual} while the maximum allowed is {limit}"
)]
Exceeded { limit: NonZeroU64, actual: u64 },
}

impl LicenseManager {
/// Check if the given CPU core count exceeds the limit as per the license key.
pub fn check_cpu_core_limit(&self, cpu_core_count: u64) -> Result<(), CpuCoreLimitExceeded> {
let license = self.license()?;

match license.cpu_core_limit {
Some(limit) if cpu_core_count > limit.get() => Err(CpuCoreLimitExceeded::Exceeded {
limit,
actual: cpu_core_count,
}),
_ => Ok(()),
}
}
}

// Tests below only work in debug mode.
#[cfg(debug_assertions)]
#[cfg(test)]
mod tests {
use expect_test::expect;
use thiserror_ext::AsReport as _;

use super::*;
use crate::{LicenseKey, TEST_PAID_LICENSE_KEY_CONTENT};
use crate::{Feature, LicenseKey, LicenseManager, TEST_PAID_LICENSE_KEY_CONTENT};

fn do_test(key: &str, cpu_core_count: u64, expect: expect_test::Expect) {
fn do_test(key: &str, cpu_core_count: usize, expect: expect_test::Expect) {
let manager = LicenseManager::new();
manager.refresh(LicenseKey(key));
manager.update_cpu_core_count(cpu_core_count);

match manager.check_cpu_core_limit(cpu_core_count) {
match Feature::TestPaid.check_available_with(&manager) {
Ok(_) => expect.assert_eq("ok"),
Err(error) => expect.assert_eq(&error.to_report_string()),
}
Expand All @@ -72,28 +37,31 @@ mod tests {
do_test(TEST_PAID_LICENSE_KEY_CONTENT, 114514, expect!["ok"]);
}

#[test]
fn test_no_license_key_no_limit() {
do_test("", 114514, expect!["ok"]);
}

#[test]
fn test_invalid_license_key() {
const KEY: &str = "invalid";

do_test(KEY, 0, expect!["cannot check CPU core limit due to license key error: invalid license key: InvalidToken"]);
do_test(KEY, 114514, expect!["cannot check CPU core limit due to license key error: invalid license key: InvalidToken"]);
do_test(
KEY,
0,
expect!["feature TestPaid is not available due to license error: invalid license key: InvalidToken"],
);
do_test(
KEY,
114514,
expect!["feature TestPaid is not available due to license error: invalid license key: InvalidToken"],
);
}

#[test]
fn test_limit() {
const KEY: &str =
"eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
eyJzdWIiOiJmcmVlLXRlc3QtMzIiLCJpc3MiOiJwcm9kLnJpc2luZ3dhdmUuY29tIiwidGllciI6ImZyZWUiLCJleHAiOjE4NTI1NTk5OTksImlhdCI6MTcyMzcwMTk5NCwiY3B1X2NvcmVfbGltaXQiOjMyfQ.\
rsATtzlduLUkGQeXkOROtyGUpafdDhi18iKdYAzAldWQuO9KevNcnD8a6geCShZSGte65bI7oYtv7GHx8i66ge3B1SVsgGgYr10ebphPUNUQenYoN0mpD4Wn0prPStOgANzYZOI2ntMGAaeWStji1x67_iho6r0W9r6RX3kMvzFSbiObSIfvTdrMULeg-xeHc3bT_ErRhaXq7MAa2Oiq3lcK2sNgEvc9KYSP9YbhSik9CBkc8lcyeVoc48SSWEaBU-c8-Ge0fzjgWHI9KIsUV5Ihe66KEfs0PqdRoSWbgskYGzA3o8wHIbtJbJiPzra373kkFH9MGY0HOsw9QeJLGQ";
eyJzdWIiOiJwYWlkLXRlc3QtMzIiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwidGllciI6InBhaWQiLCJleHAiOjIxNTA0OTU5OTksImlhdCI6MTczNzYxMjQ5NSwiY3B1X2NvcmVfbGltaXQiOjMyfQ.\
SQpX2Dmon5Mb04VUbHyxsU7owJhcdLZHqUefxAXBwG5AqgKdpfS0XUePW5E4D-EfxtH_cWJiD4QDFsfdRUz88g_n_KvfNUObMW7NV5TUoRs_ImtS4ySugExNX3JzJi71QqgI8kugStQ7uOR9kZ_C-cCc_IG2CwwEmhhW1Ij0vX7qjhG5JNMit_bhxPY7Rh27ppgPTqWxJFTTsw-9B7O5WR_yIlaDjxVzk0ALm_j6DPB249gG3dkeK0rP0AK_ip2cK6iQdy8Cge7ATD6yUh4c_aR6GILDF6-vyB7QdWU6DdQS4KhdkPNWoe_Z9psotcXQJ7NhQ39hk8tdLzmTfGDDBA";

do_test(KEY, 31, expect!["ok"]);
do_test(KEY, 32, expect!["ok"]);
do_test(KEY, 33, expect!["CPU core limit exceeded as per the license key, requesting 33 while the maximum allowed is 32"]);
do_test(KEY, 33, expect!["feature TestPaid is not available due to license error: the license key is currently not effective because the CPU core in the cluster (33) exceeds the maximum allowed by the license key (32); consider removing some nodes or acquiring a new license key with a higher limit"]);
}
}
34 changes: 17 additions & 17 deletions src/license/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use thiserror::Error;

use super::{report_telemetry, License, LicenseKeyError, LicenseManager, Tier};
use super::{report_telemetry, LicenseError, LicenseManager, Tier};

/// Define all features that are available based on the tier of the license.
///
Expand Down Expand Up @@ -113,14 +113,17 @@ pub enum FeatureNotAvailable {
#[error("feature {feature:?} is not available due to license error")]
LicenseError {
feature: Feature,
source: LicenseKeyError,
source: LicenseError,
},
}

impl Feature {
/// Check whether the feature is available based on the current license.
pub fn check_available(self) -> Result<(), FeatureNotAvailable> {
let check_res = match LicenseManager::get().license() {
/// Check whether the feature is available based on the given license manager.
pub(crate) fn check_available_with(
self,
manager: &LicenseManager,
) -> Result<(), FeatureNotAvailable> {
let check_res = match manager.license() {
Ok(license) => {
if license.tier >= self.min_tier() {
Ok(())
Expand All @@ -131,22 +134,19 @@ impl Feature {
})
}
}
Err(error) => {
// If there's a license key error, we still try against the default license first
// to see if the feature is available for free.
if License::default().tier >= self.min_tier() {
Ok(())
} else {
Err(FeatureNotAvailable::LicenseError {
feature: self,
source: error,
})
}
}
Err(error) => Err(FeatureNotAvailable::LicenseError {
feature: self,
source: error,
}),
};

report_telemetry(&self, self.get_feature_name(), check_res.is_ok());

check_res
}

/// Check whether the feature is available based on the current license.
pub fn check_available(self) -> Result<(), FeatureNotAvailable> {
self.check_available_with(LicenseManager::get())
}
}
2 changes: 2 additions & 0 deletions src/license/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(let_chains)]

mod cpu;
mod feature;
mod key;
Expand Down
47 changes: 38 additions & 9 deletions src/license/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroU64;
use std::num::NonZeroUsize;
use std::sync::{LazyLock, RwLock};

use jsonwebtoken::{Algorithm, DecodingKey, Validation};
Expand Down Expand Up @@ -81,7 +81,7 @@ pub struct License {
pub tier: Tier,

/// Maximum number of compute-node CPU cores allowed to use. Typically used for the paid tier.
pub cpu_core_limit: Option<NonZeroU64>,
pub cpu_core_limit: Option<NonZeroUsize>,

/// Expiration time in seconds since UNIX epoch.
///
Expand All @@ -106,11 +106,21 @@ impl Default for License {

/// The error type for invalid license key when verifying as JWT.
#[derive(Debug, Clone, Error)]
#[error("invalid license key")]
pub struct LicenseKeyError(#[source] jsonwebtoken::errors::Error);
pub enum LicenseError {
#[error("invalid license key")]
InvalidKey(#[source] jsonwebtoken::errors::Error),

#[error(
"the license key is currently not effective because the CPU core in the cluster \
({actual}) exceeds the maximum allowed by the license key ({limit}); \
consider removing some nodes or acquiring a new license key with a higher limit"
)]
CpuCoreLimitExceeded { limit: NonZeroUsize, actual: usize },
}

struct Inner {
license: Result<License, LicenseKeyError>,
license: Result<License, LicenseError>,
cached_cpu_core_count: usize,
}

/// The singleton license manager.
Expand All @@ -129,6 +139,7 @@ impl LicenseManager {
Self {
inner: RwLock::new(Inner {
license: Ok(License::default()),
cached_cpu_core_count: 0,
}),
}
}
Expand Down Expand Up @@ -162,7 +173,7 @@ impl LicenseManager {

inner.license = match jsonwebtoken::decode(license_key, &PUBLIC_KEY, &validation) {
Ok(data) => Ok(data.claims),
Err(error) => Err(LicenseKeyError(error)),
Err(error) => Err(LicenseError::InvalidKey(error)),
};

match &inner.license {
Expand All @@ -171,22 +182,40 @@ impl LicenseManager {
}
}

/// Update the cached CPU core count.
pub fn update_cpu_core_count(&self, cpu_core_count: usize) {
let mut inner = self.inner.write().unwrap();
inner.cached_cpu_core_count = cpu_core_count;
}

/// Get the current license if it is valid.
///
/// Since the license can expire, the returned license should not be cached by the caller.
///
/// Prefer calling [`crate::Feature::check_available`] to check the availability of a feature,
/// other than directly calling this method and checking the content of the license.
pub fn license(&self) -> Result<License, LicenseKeyError> {
let license = self.inner.read().unwrap().license.clone()?;
pub fn license(&self) -> Result<License, LicenseError> {
let inner = self.inner.read().unwrap();
let license = inner.license.clone()?;

// Check the expiration time additionally.
if license.exp < jsonwebtoken::get_current_timestamp() {
return Err(LicenseKeyError(
return Err(LicenseError::InvalidKey(
jsonwebtoken::errors::ErrorKind::ExpiredSignature.into(),
));
}

// Check the CPU core limit.
let actual_cpu_core = inner.cached_cpu_core_count;
if let Some(limit) = license.cpu_core_limit
&& actual_cpu_core > limit.get()
{
return Err(LicenseError::CpuCoreLimitExceeded {
limit,
actual: actual_cpu_core,
});
}

Ok(license)
}
}
Expand Down
Loading
Loading