Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12875,6 +12875,22 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NONE
- name: kvflowcontrol.streams.send.inflight.blocked_count
exported_name: kvflowcontrol_streams_send_inflight_blocked_count
description: Number of send replication streams with no flow tokens available for inflight requests
y_axis_label: Count
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
- name: kvflowcontrol.streams.send.inflight.total_count
exported_name: kvflowcontrol_streams_send_inflight_total_count
description: Total number of send replication streams for inflight requests
y_axis_label: Count
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
- name: kvflowcontrol.streams.send.regular.blocked_count
exported_name: kvflowcontrol_streams_send_regular_blocked_count
description: Number of send replication streams with no flow tokens available for regular requests
Expand Down Expand Up @@ -13027,6 +13043,46 @@ layers:
unit: BYTES
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: kvflowcontrol.tokens.send.inflight.available
exported_name: kvflowcontrol_tokens_send_inflight_available
description: Flow send tokens available for inflight requests, across all replication streams
y_axis_label: Bytes
type: GAUGE
unit: BYTES
aggregation: AVG
derivative: NONE
- name: kvflowcontrol.tokens.send.inflight.deducted
exported_name: kvflowcontrol_tokens_send_inflight_deducted
description: Flow send tokens deducted by inflight requests, across all replication streams
y_axis_label: Bytes
type: COUNTER
unit: BYTES
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: kvflowcontrol.tokens.send.inflight.returned
exported_name: kvflowcontrol_tokens_send_inflight_returned
description: Flow send tokens returned by inflight requests, across all replication streams
y_axis_label: Bytes
type: COUNTER
unit: BYTES
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: kvflowcontrol.tokens.send.inflight.returned.disconnect
exported_name: kvflowcontrol_tokens_send_inflight_returned_disconnect
description: Flow send tokens returned early by inflight due disconnects, across all replication stream, this is a subset of returned tokens
y_axis_label: Bytes
type: COUNTER
unit: BYTES
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: kvflowcontrol.tokens.send.inflight.unaccounted
exported_name: kvflowcontrol_tokens_send_inflight_unaccounted
description: Flow send tokens returned by inflight requests that were unaccounted for, across all replication streams
y_axis_label: Bytes
type: COUNTER
unit: BYTES
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: kvflowcontrol.tokens.send.regular.available
exported_name: kvflowcontrol_tokens_send_regular_available
description: Flow send tokens available for regular requests, across all replication streams
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestFlowControlBasicV2(t *testing.T) {
h.query(n1, `
SELECT name, value
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%stream%'
WHERE name LIKE '%kvflowcontrol%stream%' AND name NOT LIKE '%inflight%'
ORDER BY name ASC;
`)

Expand Down Expand Up @@ -3819,15 +3819,16 @@ var flowPerStoreDeductionQueryHeaderStrs = []string{
"range_id", "store_id", "priority", "tokens"}

// v2FlowTokensQueryStr is the query string to fetch flow tokens metrics from
// the node metrics table.
// the node metrics table. We exclude the inflight metrics since they are
// non-deterministic.
const v2FlowTokensQueryStr = `
SELECT
name,
crdb_internal.humanize_bytes(value::INT8)
FROM
crdb_internal.node_metrics
WHERE
name LIKE '%kvflowcontrol%tokens%'
name LIKE '%kvflowcontrol%tokens%' AND name NOT LIKE '%inflight%'
ORDER BY
name ASC;
`
Expand Down
41 changes: 27 additions & 14 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ var RegularTokensPerStream = settings.RegisterByteSizeSetting(
"kvadmission.flow_controller.regular_tokens_per_stream",
"flow tokens available for regular work on a per-stream basis",
16<<20, // 16 MiB
validateTokenRange,
validateTokenRange(maxTokensPerStream),
)

// ElasticTokensPerStream determines the flow tokens available for elastic work
Expand All @@ -95,24 +95,37 @@ var ElasticTokensPerStream = settings.RegisterByteSizeSetting(
"kvadmission.flow_controller.elastic_tokens_per_stream",
"flow tokens available for elastic work on a per-stream basis",
8<<20, // 8 MiB
validateTokenRange,
validateTokenRange(maxTokensPerStream),
)

// InFlightTokensPerStream determines the flow tokens available for inflight
// work on a per-stream basis.
var InFlightTokensPerStream = settings.RegisterByteSizeSetting(
settings.SystemOnly,
"kvadmission.flow_controller.in_flight_tokens_per_stream",
"flow tokens available for in-flight work on a per-stream basis",
64<<20, // 64 MiB
validateTokenRange(maxInFlightTokensPerStream),
)

const (
minTokensPerStream Tokens = 1 << 20 // 1 MiB
maxTokensPerStream Tokens = 64 << 20 // 64 MiB
minTokensPerStream Tokens = 1 << 20 // 1 MiB
maxTokensPerStream Tokens = 64 << 20 // 64 MiB
maxInFlightTokensPerStream = 512 << 20 // 512MiB
)

var validateTokenRange = settings.WithValidateInt(func(b int64) error {
t := Tokens(b)
if t < minTokensPerStream {
return fmt.Errorf("minimum flowed tokens allowed is %s, got %s", minTokensPerStream, t)
}
if t > maxTokensPerStream {
return fmt.Errorf("maximum flow tokens allowed is %s, got %s", maxTokensPerStream, t)
}
return nil
})
func validateTokenRange(maxTokens Tokens) settings.SettingOption {
return settings.WithValidateInt(func(b int64) error {
t := Tokens(b)
if t < minTokensPerStream {
return fmt.Errorf("minimum flowed tokens allowed is %s, got %s", minTokensPerStream, t)
}
if t > maxTokens {
return fmt.Errorf("maximum flow tokens allowed is %s, got %s", maxTokens, t)
}
return nil
})
}

// TokenCounterResetEpoch is an escape hatch for administrators that should
// never be needed. By incrementing this epoch (or changing it to a value
Expand Down
Loading