Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/uber/cadence/service/matching/config"
"github.com/uber/cadence/service/matching/event"
"github.com/uber/cadence/service/matching/tasklist"
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
)

// If sticky poller is not seem in last 10s, we treat it as sticky worker unavailable
Expand Down Expand Up @@ -88,8 +89,10 @@ type (
tokenSerializer common.TaskTokenSerializer
logger log.Logger
metricsClient metrics.Client
taskListsLock sync.RWMutex // locks mutation of taskLists
taskLists map[tasklist.Identifier]tasklist.Manager // Convert to LRU cache
taskListsLock sync.RWMutex // locks mutation of taskLists
taskLists map[tasklist.Identifier]tasklist.ShardProcessor // Convert to LRU cache
executor executorclient.Executor[tasklist.ShardProcessor]
taskListsFactory *tasklist.ShardProcessorFactory
config *config.Config
lockableQueryTaskMap lockableQueryTaskMap
domainCache cache.DomainCache
Expand Down Expand Up @@ -135,15 +138,14 @@ func NewEngine(
isolationState isolationgroup.State,
timeSource clock.TimeSource,
) Engine {

e := &matchingEngineImpl{
shutdown: make(chan struct{}),
shutdownCompletion: &sync.WaitGroup{},
taskManager: taskManager,
clusterMetadata: clusterMetadata,
historyService: historyService,
tokenSerializer: common.NewJSONTaskTokenSerializer(),
taskLists: make(map[tasklist.Identifier]tasklist.Manager),
taskLists: make(map[tasklist.Identifier]tasklist.ShardProcessor),
logger: logger.WithTags(tag.ComponentMatchingEngine),
metricsClient: metricsClient,
matchingClient: matchingClient,
Expand All @@ -156,6 +158,7 @@ func NewEngine(
timeSource: timeSource,
}

e.setupTaskListFactory()
e.shutdownCompletion.Add(1)
go e.runMembershipChangeLoop()

Expand All @@ -176,10 +179,27 @@ func (e *matchingEngineImpl) Stop() {
e.shutdownCompletion.Wait()
}

func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.Manager {
func (e *matchingEngineImpl) setupTaskListFactory() {
taskListFactory := &tasklist.ShardProcessorFactory{
DomainCache: e.domainCache,
Logger: e.logger,
MetricsClient: e.metricsClient,
TaskManager: e.taskManager,
ClusterMetadata: e.clusterMetadata,
IsolationState: e.isolationState,
MatchingClient: e.matchingClient,
CloseCallback: e.removeTaskListManager,
Cfg: e.config,
TimeSource: e.timeSource,
CreateTime: e.timeSource.Now(),
HistoryService: e.historyService}
e.taskListsFactory = taskListFactory
}

func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.ShardProcessor {
e.taskListsLock.RLock()
defer e.taskListsLock.RUnlock()
lists := make([]tasklist.Manager, 0, len(e.taskLists))
lists := make([]tasklist.ShardProcessor, 0, len(e.taskLists))
count := 0
for _, tlMgr := range e.taskLists {
lists = append(lists, tlMgr)
Expand All @@ -202,7 +222,7 @@ func (e *matchingEngineImpl) String() string {

// Returns taskListManager for a task list. If not already cached gets new range from DB and
// if successful creates one.
func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, taskListKind types.TaskListKind) (tasklist.Manager, error) {
func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, taskListKind types.TaskListKind) (tasklist.ShardProcessor, error) {
// The first check is an optimization so almost all requests will have a task list manager
// and return avoiding the write lock
e.taskListsLock.RLock()
Expand Down Expand Up @@ -232,22 +252,7 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t
)

logger.Info("Task list manager state changed", tag.LifeCycleStarting)
params := tasklist.ManagerParams{
DomainCache: e.domainCache,
Logger: e.logger,
MetricsClient: e.metricsClient,
TaskManager: e.taskManager,
ClusterMetadata: e.clusterMetadata,
IsolationState: e.isolationState,
MatchingClient: e.matchingClient,
CloseCallback: e.removeTaskListManager,
TaskList: taskList,
TaskListKind: taskListKind,
Cfg: e.config,
TimeSource: e.timeSource,
CreateTime: e.timeSource.Now(),
HistoryService: e.historyService}
mgr, err := tasklist.NewManager(params)
mgr, err := e.taskListsFactory.NewShardProcessorWithTaskListIdentifier(taskList, taskListKind)
if err != nil {
e.taskListsLock.Unlock()
logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err))
Expand All @@ -260,7 +265,7 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t
float64(len(e.taskLists)),
)
e.taskListsLock.Unlock()
err = mgr.Start()
err = mgr.Start(context.Background())
if err != nil {
logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err))
return nil, err
Expand Down Expand Up @@ -298,18 +303,19 @@ func (e *matchingEngineImpl) getTaskListByDomainLocked(domainID string, taskList
}

// For use in tests
func (e *matchingEngineImpl) updateTaskList(taskList *tasklist.Identifier, mgr tasklist.Manager) {
func (e *matchingEngineImpl) updateTaskList(taskList *tasklist.Identifier, mgr tasklist.ShardProcessor) {
e.taskListsLock.Lock()
defer e.taskListsLock.Unlock()
e.taskLists[*taskList] = mgr
}

func (e *matchingEngineImpl) removeTaskListManager(tlMgr tasklist.Manager) {
func (e *matchingEngineImpl) removeTaskListManager(tlMgr tasklist.ShardProcessor) {
id := tlMgr.TaskListID()
e.taskListsLock.Lock()
defer e.taskListsLock.Unlock()

currentTlMgr, ok := e.taskLists[*id]
if ok && tlMgr == currentTlMgr {
if ok && currentTlMgr.String() == tlMgr.String() {
delete(e.taskLists, *id)
}

Expand Down
2 changes: 2 additions & 0 deletions service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ func (s *matchingEngineSuite) SyncMatchTasks(taskType int, enableIsolation bool)
// So we can get snapshots
scope := tally.NewTestScope("test", nil)
s.matchingEngine.metricsClient = metrics.NewClient(scope, metrics.Matching, metrics.HistogramMigration{})
s.matchingEngine.taskListsFactory.MetricsClient = metrics.NewClient(scope, metrics.Matching, metrics.HistogramMigration{})

testParam := newTestParam(s.T(), taskType)
s.taskManager.SetRangeID(testParam.TaskListID, initialRangeID)
Expand Down Expand Up @@ -840,6 +841,7 @@ func (s *matchingEngineSuite) ConcurrentAddAndPollTasks(taskType int, workerCoun
}
scope := tally.NewTestScope("test", nil)
s.matchingEngine.metricsClient = metrics.NewClient(scope, metrics.Matching, metrics.HistogramMigration{})
s.matchingEngine.taskListsFactory.MetricsClient = metrics.NewClient(scope, metrics.Matching, metrics.HistogramMigration{})

const initialRangeID = 0
const rangeSize = 3
Expand Down
Loading
Loading