Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,7 @@ const (
ShardDistributorStoreAssignShardScope
ShardDistributorStoreAssignShardsScope
ShardDistributorStoreDeleteExecutorsScope
ShardDistributorStoreGetShardStatsScope
ShardDistributorStoreDeleteShardStatsScope
ShardDistributorStoreGetHeartbeatScope
ShardDistributorStoreGetExecutorScope
Expand Down Expand Up @@ -2168,6 +2169,7 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
ShardDistributorStoreAssignShardScope: {operation: "StoreAssignShard"},
ShardDistributorStoreAssignShardsScope: {operation: "StoreAssignShards"},
ShardDistributorStoreDeleteExecutorsScope: {operation: "StoreDeleteExecutors"},
ShardDistributorStoreGetShardStatsScope: {operation: "StoreGetShardStats"},
ShardDistributorStoreDeleteShardStatsScope: {operation: "StoreDeleteShardStats"},
ShardDistributorStoreGetHeartbeatScope: {operation: "StoreGetHeartbeat"},
ShardDistributorStoreGetExecutorScope: {operation: "StoreGetExecutor"},
Expand Down Expand Up @@ -2970,6 +2972,13 @@ const (
ShardDistributorStoreRequestsPerNamespace
ShardDistributorStoreLatencyHistogramPerNamespace

// ShardDistributorShardAssignmentDistributionLatency measures the time taken between assignment of a shard
// and the time it is fully distributed to executors
ShardDistributorShardAssignmentDistributionLatency

// ShardDistributorShardHandoverLatency measures the time taken to hand over a shard from one executor to another
ShardDistributorShardHandoverLatency

NumShardDistributorMetrics
)

Expand Down Expand Up @@ -3755,6 +3764,9 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
ShardDistributorStoreFailuresPerNamespace: {metricName: "shard_distributor_store_failures_per_namespace", metricType: Counter},
ShardDistributorStoreRequestsPerNamespace: {metricName: "shard_distributor_store_requests_per_namespace", metricType: Counter},
ShardDistributorStoreLatencyHistogramPerNamespace: {metricName: "shard_distributor_store_latency_histogram_per_namespace", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},

ShardDistributorShardAssignmentDistributionLatency: {metricName: "shard_distributor_shard_assignment_distribution_latency", metricType: Histogram, buckets: Default1ms100s.buckets()},
ShardDistributorShardHandoverLatency: {metricName: "shard_distributor_shard_handover_latency", metricType: Histogram, buckets: Default1ms100s.buckets()},
},
}

Expand Down
4 changes: 4 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ func NamespaceTypeTag(namespaceType string) Tag {
return metricWithUnknown("namespace_type", namespaceType)
}

func HandoverTypeTag(handoverType string) Tag {
return metricWithUnknown("handover_type", handoverType)
}

func TaskCategoryTag(category string) Tag {
return metricWithUnknown("task_category", category)
}
Expand Down
73 changes: 70 additions & 3 deletions service/sharddistributor/handler/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/service/sharddistributor/store"
Expand All @@ -28,6 +29,7 @@ type executor struct {
storage store.Store
shardDistributionCfg config.ShardDistribution
migrationConfiguration *config.MigrationConfig
metricsClient metrics.Client
}

func NewExecutorHandler(
Expand All @@ -36,13 +38,15 @@ func NewExecutorHandler(
timeSource clock.TimeSource,
shardDistributionCfg config.ShardDistribution,
migrationConfig *config.MigrationConfig,
metricsClient metrics.Client,
) Executor {
return &executor{
logger: logger,
timeSource: timeSource,
storage: storage,
shardDistributionCfg: shardDistributionCfg,
migrationConfiguration: migrationConfig,
metricsClient: metricsClient,
}
}

Expand All @@ -53,8 +57,9 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to get heartbeat: %v", err)}
}

now := h.timeSource.Now().UTC()
heartbeatTime := h.timeSource.Now().UTC()
mode := h.migrationConfiguration.GetMigrationMode(request.Namespace)
shardAssignedInBackground := true

switch mode {
case types.MigrationModeINVALID:
Expand All @@ -68,19 +73,20 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
if err != nil {
return nil, err
}
shardAssignedInBackground = false
}

// If the state has changed we need to update heartbeat data.
// Otherwise, we want to do it with controlled frequency - at most every _heartbeatRefreshRate.
if previousHeartbeat != nil && request.Status == previousHeartbeat.Status && mode == types.MigrationModeONBOARDED {
lastHeartbeatTime := previousHeartbeat.LastHeartbeat
if now.Sub(lastHeartbeatTime) < _heartbeatRefreshRate {
if heartbeatTime.Sub(lastHeartbeatTime) < _heartbeatRefreshRate {
return _convertResponse(assignedShards, mode), nil
}
}

newHeartbeat := store.HeartbeatState{
LastHeartbeat: now,
LastHeartbeat: heartbeatTime,
Status: request.Status,
ReportedShards: request.ShardStatusReports,
Metadata: request.GetMetadata(),
Expand All @@ -95,9 +101,49 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to record heartbeat: %v", err)}
}

// emit shard assignment metrics only if shards are assigned in the background
// shard assignment in heartbeat doesn't involve any assignment changes happening in the background
// thus there was no shard handover and no assignment distribution latency
// to measure, so don't need to emit metrics in that case
if shardAssignedInBackground {
h.emitShardAssignmentMetrics(request.Namespace, heartbeatTime, previousHeartbeat, assignedShards)
}

return _convertResponse(assignedShards, mode), nil
}

// emitShardAssignmentMetrics emits the following metrics for newly assigned shards:
// - ShardAssignmentDistributionLatency: time taken since the shard was assigned to heartbeat time
// - ShardHandoverLatency: time taken since the previous executor's last heartbeat to heartbeat time
func (h *executor) emitShardAssignmentMetrics(namespace string, heartbeatTime time.Time, previousHeartbeat *store.HeartbeatState, assignedState *store.AssignedState) {
// find newly assigned shards, if there are none, no handovers happened
newAssignedShardIDs := filterNewlyAssignedShardIDs(previousHeartbeat, assignedState)
if len(newAssignedShardIDs) == 0 {
// no need to emit ShardDistributorShardAssignmentDistributionLatency due to no handovers
return
}

metricsScope := h.metricsClient.Scope(metrics.ShardDistributorHeartbeatScope).
Tagged(metrics.NamespaceTag(namespace))

distributionLatency := heartbeatTime.Sub(assignedState.LastUpdated)
metricsScope.RecordHistogramDuration(metrics.ShardDistributorShardAssignmentDistributionLatency, distributionLatency)

for _, shardID := range newAssignedShardIDs {
handoverStats, ok := assignedState.ShardHandoverStats[shardID]
if !ok {
// no handover stats for this shard, means it was never handed over before
// so no handover latency metric to emit
continue
}

handoverLatency := heartbeatTime.Sub(handoverStats.PreviousExecutorLastHeartbeatTime)
metricsScope.Tagged(metrics.HandoverTypeTag(handoverStats.HandoverType.String())).
RecordHistogramDuration(metrics.ShardDistributorShardHandoverLatency, handoverLatency)

}
}

// assignShardsInCurrentHeartbeat is used during the migration phase to assign the shards to the executors according to what is reported during the heartbeat
func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *types.ExecutorHeartbeatRequest) (*store.AssignedState, error) {
assignedShards := store.AssignedState{
Expand Down Expand Up @@ -155,3 +201,24 @@ func validateMetadata(metadata map[string]string) error {

return nil
}

func filterNewlyAssignedShardIDs(previousHeartbeat *store.HeartbeatState, assignedState *store.AssignedState) []string {
// if assignedState is nil, no shards are assigned
if assignedState == nil {
return nil
}

var newAssignedShardIDs []string
for assignedShardID := range assignedState.AssignedShards {
if previousHeartbeat == nil || !shardInReportedShards(previousHeartbeat.ReportedShards, assignedShardID) {
newAssignedShardIDs = append(newAssignedShardIDs, assignedShardID)
}
}

return newAssignedShardIDs
}

func shardInReportedShards(reportedShards map[string]*types.ShardStatusReport, shardID string) bool {
_, ok := reportedShards[shardID]
return ok
}
Loading
Loading