99 "github.com/uber/cadence/common/clock"
1010 "github.com/uber/cadence/common/log"
1111 "github.com/uber/cadence/common/log/tag"
12+ "github.com/uber/cadence/common/metrics"
1213 "github.com/uber/cadence/common/types"
1314 "github.com/uber/cadence/service/sharddistributor/config"
1415 "github.com/uber/cadence/service/sharddistributor/store"
@@ -28,6 +29,7 @@ type executor struct {
2829 storage store.Store
2930 shardDistributionCfg config.ShardDistribution
3031 migrationConfiguration * config.MigrationConfig
32+ metricsClient metrics.Client
3133}
3234
3335func NewExecutorHandler (
@@ -36,13 +38,15 @@ func NewExecutorHandler(
3638 timeSource clock.TimeSource ,
3739 shardDistributionCfg config.ShardDistribution ,
3840 migrationConfig * config.MigrationConfig ,
41+ metricsClient metrics.Client ,
3942) Executor {
4043 return & executor {
4144 logger : logger ,
4245 timeSource : timeSource ,
4346 storage : storage ,
4447 shardDistributionCfg : shardDistributionCfg ,
4548 migrationConfiguration : migrationConfig ,
49+ metricsClient : metricsClient ,
4650 }
4751}
4852
@@ -53,8 +57,9 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
5357 return nil , & types.InternalServiceError {Message : fmt .Sprintf ("failed to get heartbeat: %v" , err )}
5458 }
5559
56- now := h .timeSource .Now ().UTC ()
60+ heartbeatTime := h .timeSource .Now ().UTC ()
5761 mode := h .migrationConfiguration .GetMigrationMode (request .Namespace )
62+ shardAssignedInBackground := true
5863
5964 switch mode {
6065 case types .MigrationModeINVALID :
@@ -68,19 +73,20 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
6873 if err != nil {
6974 return nil , err
7075 }
76+ shardAssignedInBackground = false
7177 }
7278
7379 // If the state has changed we need to update heartbeat data.
7480 // Otherwise, we want to do it with controlled frequency - at most every _heartbeatRefreshRate.
7581 if previousHeartbeat != nil && request .Status == previousHeartbeat .Status && mode == types .MigrationModeONBOARDED {
7682 lastHeartbeatTime := previousHeartbeat .LastHeartbeat
77- if now .Sub (lastHeartbeatTime ) < _heartbeatRefreshRate {
83+ if heartbeatTime .Sub (lastHeartbeatTime ) < _heartbeatRefreshRate {
7884 return _convertResponse (assignedShards , mode ), nil
7985 }
8086 }
8187
8288 newHeartbeat := store.HeartbeatState {
83- LastHeartbeat : now ,
89+ LastHeartbeat : heartbeatTime ,
8490 Status : request .Status ,
8591 ReportedShards : request .ShardStatusReports ,
8692 Metadata : request .GetMetadata (),
@@ -95,9 +101,49 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
95101 return nil , & types.InternalServiceError {Message : fmt .Sprintf ("failed to record heartbeat: %v" , err )}
96102 }
97103
104+ // emit shard assignment metrics only if shards are assigned in the background
105+ // shard assignment in heartbeat doesn't involve any assignment changes happening in the background
106+ // thus there was no shard handover and no assignment distribution latency
107+ // to measure, so don't need to emit metrics in that case
108+ if shardAssignedInBackground {
109+ h .emitShardAssignmentMetrics (request .Namespace , heartbeatTime , previousHeartbeat , assignedShards )
110+ }
111+
98112 return _convertResponse (assignedShards , mode ), nil
99113}
100114
115+ // emitShardAssignmentMetrics emits the following metrics for newly assigned shards:
116+ // - ShardAssignmentDistributionLatency: time taken since the shard was assigned to heartbeat time
117+ // - ShardHandoverLatency: time taken since the previous executor's last heartbeat to heartbeat time
118+ func (h * executor ) emitShardAssignmentMetrics (namespace string , heartbeatTime time.Time , previousHeartbeat * store.HeartbeatState , assignedState * store.AssignedState ) {
119+ // find newly assigned shards, if there are none, no handovers happened
120+ newAssignedShardIDs := filterNewlyAssignedShardIDs (previousHeartbeat , assignedState )
121+ if len (newAssignedShardIDs ) == 0 {
122+ // no need to emit ShardDistributorShardAssignmentDistributionLatency due to no handovers
123+ return
124+ }
125+
126+ metricsScope := h .metricsClient .Scope (metrics .ShardDistributorHeartbeatScope ).
127+ Tagged (metrics .NamespaceTag (namespace ))
128+
129+ distributionLatency := heartbeatTime .Sub (assignedState .LastUpdated )
130+ metricsScope .RecordHistogramDuration (metrics .ShardDistributorShardAssignmentDistributionLatency , distributionLatency )
131+
132+ for _ , shardID := range newAssignedShardIDs {
133+ handoverStats , ok := assignedState .ShardHandoverStats [shardID ]
134+ if ! ok {
135+ // no handover stats for this shard, means it was never handed over before
136+ // so no handover latency metric to emit
137+ continue
138+ }
139+
140+ handoverLatency := heartbeatTime .Sub (handoverStats .PreviousExecutorLastHeartbeatTime )
141+ metricsScope .Tagged (metrics .HandoverTypeTag (handoverStats .HandoverType .String ())).
142+ RecordHistogramDuration (metrics .ShardDistributorShardHandoverLatency , handoverLatency )
143+
144+ }
145+ }
146+
101147// assignShardsInCurrentHeartbeat is used during the migration phase to assign the shards to the executors according to what is reported during the heartbeat
102148func (h * executor ) assignShardsInCurrentHeartbeat (ctx context.Context , request * types.ExecutorHeartbeatRequest ) (* store.AssignedState , error ) {
103149 assignedShards := store.AssignedState {
@@ -155,3 +201,24 @@ func validateMetadata(metadata map[string]string) error {
155201
156202 return nil
157203}
204+
205+ func filterNewlyAssignedShardIDs (previousHeartbeat * store.HeartbeatState , assignedState * store.AssignedState ) []string {
206+ // if assignedState is nil, no shards are assigned
207+ if assignedState == nil {
208+ return nil
209+ }
210+
211+ var newAssignedShardIDs []string
212+ for assignedShardID := range assignedState .AssignedShards {
213+ if previousHeartbeat == nil || ! shardInReportedShards (previousHeartbeat .ReportedShards , assignedShardID ) {
214+ newAssignedShardIDs = append (newAssignedShardIDs , assignedShardID )
215+ }
216+ }
217+
218+ return newAssignedShardIDs
219+ }
220+
221+ func shardInReportedShards (reportedShards map [string ]* types.ShardStatusReport , shardID string ) bool {
222+ _ , ok := reportedShards [shardID ]
223+ return ok
224+ }
0 commit comments