Skip to content

Commit

Permalink
Merge pull request #194 from camphor-/use-transaction-when-changing-s…
Browse files Browse the repository at this point in the history
…tate

大量にskipAPIが叩かれても同時に処理しないようにした (トランザクションとレートリミットを使ってみたパターン)
  • Loading branch information
p1ass authored Aug 10, 2020
2 parents a94c4f5 + f5a356e commit dea9cf9
Show file tree
Hide file tree
Showing 7 changed files with 577 additions and 367 deletions.
2 changes: 1 addition & 1 deletion domain/entity/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *Session) IsCreator(userID string) bool {
func (s *Session) GoNextTrack() error {
s.SetProgressWhenPaused(0 * time.Second)
if len(s.QueueTracks) <= s.QueueHead+1 {
s.QueueHead++ // https://github.com/camphor-/relaym-server/blob/master/docs/definition.md#%E7%8F%BE%E5%9C%A8%E5%AF%BE%E8%B1%A1%E3%81%AE%E6%9B%B2%E3%81%AE%E3%82%A4%E3%83%B3%E3%83%87%E3%83%83%E3%82%AF%E3%82%B9-head
s.QueueHead = len(s.QueueTracks) // https://github.com/camphor-/relaym-server/blob/master/docs/definition.md#%E7%8F%BE%E5%9C%A8%E5%AF%BE%E8%B1%A1%E3%81%AE%E6%9B%B2%E3%81%AE%E3%82%A4%E3%83%B3%E3%83%87%E3%83%83%E3%82%AF%E3%82%B9-head
s.StateType = Stop
return ErrSessionAllTracksFinished
}
Expand Down
12 changes: 10 additions & 2 deletions domain/entity/sync_check_timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newSyncCheckTimer() *SyncCheckTimer {

return &SyncCheckTimer{
stopCh: make(chan struct{}, 2),
nextCh: make(chan struct{}, 1),
nextCh: make(chan struct{}, 10),
isTimerExpired: true,
timer: timer,
}
Expand All @@ -65,6 +65,14 @@ func (s *SyncCheckTimer) SetDuration(d time.Duration) {
s.timer.Reset(d)
}

// sendToNextTrackNotToExceedCap は チャネルのキャパシティを超えないようにしながら、nextChに構造体を送ります。
// キャパシティを超えるとチャネルにメッセージが送られないので、API Rate Limitの役割を果たしています。
func (s *SyncCheckTimer) sendToNextTrackNotToExceedCap() {
if len(s.nextCh) < cap(s.nextCh) {
s.nextCh <- struct{}{}
}
}

// SyncCheckTimerManager はSpotifyとの同期チェック用のタイマーを一括して管理する構造体です。
type SyncCheckTimerManager struct {
timers map[string]*SyncCheckTimer
Expand Down Expand Up @@ -137,7 +145,7 @@ func (m *SyncCheckTimerManager) SendToNextCh(sessionID string) error {
logger.Debugj(map[string]interface{}{"message": "call next ch", "sessionID": sessionID})

if timer, ok := m.timers[sessionID]; ok {
timer.nextCh <- struct{}{}
timer.sendToNextTrackNotToExceedCap()
return nil
}

Expand Down
131 changes: 78 additions & 53 deletions usecase/session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewSessionStateUseCase(sessionRepo repository.Session, playerCli spotify.Pl

// NextTrack は指定されたidのsessionを次の曲に進めます
func (s *SessionStateUseCase) NextTrack(ctx context.Context, sessionID string) error {
session, err := s.sessionRepo.FindByID(ctx, sessionID)
session, err := s.sessionRepo.FindByIDForUpdate(ctx, sessionID)
if err != nil {
return fmt.Errorf("find session id=%s: %w", sessionID, err)
}
Expand All @@ -40,15 +40,15 @@ func (s *SessionStateUseCase) NextTrack(ctx context.Context, sessionID string) e

switch session.StateType {
case entity.Play:
if err = s.nextTrackInPlay(ctx, session); err != nil {
if err = s.nextTrackInPlay(ctx, sessionID); err != nil {
return fmt.Errorf("go next track in play session id=%s: %w", session.ID, err)
}
case entity.Pause:
if err = s.nextTrackInPause(ctx, session); err != nil {
if err = s.nextTrackInPause(ctx, sessionID); err != nil {
return fmt.Errorf("go next track in pause session id=%s: %w", session.ID, err)
}
case entity.Stop:
if err = s.nextTrackInStop(ctx, session); err != nil {
if err = s.nextTrackInStop(ctx, sessionID); err != nil {
return fmt.Errorf("go next track in stop session id=%s: %w", session.ID, err)
}
case entity.Archived:
Expand All @@ -59,82 +59,107 @@ func (s *SessionStateUseCase) NextTrack(ctx context.Context, sessionID string) e
}

// nextTrackInPlay はsessionのstateがPLAYの時のnextTrackの処理を行います
func (s *SessionStateUseCase) nextTrackInPlay(ctx context.Context, session *entity.Session) error {
if err := s.playerCli.GoNextTrack(ctx, session.DeviceID); err != nil {
return fmt.Errorf("GoNextTrack: %w", err)
}

func (s *SessionStateUseCase) nextTrackInPlay(ctx context.Context, sessionID string) error {
// NextChを通してstartTrackEndTriggerに次の曲への遷移を通知
if err := s.timerUC.sendToNextCh(session.ID); err != nil {
if err := s.timerUC.sendToNextCh(sessionID); err != nil {
return fmt.Errorf("send to next ch: %w", err)
}

return nil
}

// nextTrackInPause はsessionのstateがPAUSEの時のnextTrackの処理を行います
func (s *SessionStateUseCase) nextTrackInPause(ctx context.Context, session *entity.Session) error {
if err := s.playerCli.GoNextTrack(ctx, session.DeviceID); err != nil {
return fmt.Errorf("GoNextTrack: %w", err)
func (s *SessionStateUseCase) nextTrackInPause(ctx context.Context, sessionID string) error {
_, err := s.sessionRepo.DoInTx(ctx, s.nextTrackInPauseTx(sessionID))
if err != nil {
return fmt.Errorf("nextTrackInPause transaction: %w", err)
}

if err := session.GoNextTrack(); err != nil && errors.Is(err, entity.ErrSessionAllTracksFinished) {
s.timerUC.handleAllTrackFinish(session)
if err := s.sessionRepo.Update(ctx, session); err != nil {
return fmt.Errorf("update session id=%s: %w", session.ID, err)
return nil
}

func (s *SessionStateUseCase) nextTrackInPauseTx(sessionID string) func(ctx context.Context) (interface{}, error) {
return func(ctx context.Context) (interface{}, error) {
session, err := s.sessionRepo.FindByIDForUpdate(ctx, sessionID)
if err != nil {
return nil, fmt.Errorf("find session: %w", err)
}
if err := s.playerCli.GoNextTrack(ctx, session.DeviceID); err != nil {
return nil, fmt.Errorf("GoNextTrack: %w", err)
}
return nil
}

// GoNextTrackだけだと次の曲の再生が始まってしまう
if err := s.playerCli.Pause(ctx, session.DeviceID); err != nil {
return fmt.Errorf("call pause api: %w", err)
}
if err := session.GoNextTrack(); err != nil && errors.Is(err, entity.ErrSessionAllTracksFinished) {
s.timerUC.handleAllTrackFinish(session)
if err := s.sessionRepo.Update(ctx, session); err != nil {
return nil, fmt.Errorf("update session id=%s: %w", session.ID, err)
}
return nil, nil
}

if err := s.sessionRepo.Update(ctx, session); err != nil {
return fmt.Errorf("update session id=%s: %w", session.ID, err)
}
// GoNextTrackだけだと次の曲の再生が始まってしまう
if err := s.playerCli.Pause(ctx, session.DeviceID); err != nil {
return nil, fmt.Errorf("call pause api: %w", err)
}

track := session.TrackURIShouldBeAddedWhenHandleTrackEnd()
if track != "" {
if err := s.playerCli.Enqueue(ctx, track, session.DeviceID); err != nil {
return fmt.Errorf("enqueue error session id=%s: %w", session.ID, err)
if err := s.sessionRepo.Update(ctx, session); err != nil {
return nil, fmt.Errorf("update session id=%s: %w", session.ID, err)
}
}

s.pusher.Push(&event.PushMessage{
SessionID: session.ID,
Msg: entity.NewEventNextTrack(session.QueueHead),
})
track := session.TrackURIShouldBeAddedWhenHandleTrackEnd()
if track != "" {
if err := s.playerCli.Enqueue(ctx, track, session.DeviceID); err != nil {
return nil, fmt.Errorf("enqueue error session id=%s: %w", session.ID, err)
}
}

return nil
s.pusher.Push(&event.PushMessage{
SessionID: session.ID,
Msg: entity.NewEventNextTrack(session.QueueHead),
})
return nil, nil
}
}

// nextTrackInStop はsessionのstateがSTOPの時のnextTrackの処理を行います
// stopToPlayで曲がResetされ、再度Spotifyのキューに積まれるため、Enqueueを行っていません
func (s *SessionStateUseCase) nextTrackInStop(ctx context.Context, session *entity.Session) error {
if !session.IsNextTrackExistWhenStateIsStop() {
return fmt.Errorf("nextTrackInStop: %w", entity.ErrNextQueueTrackNotFound)
func (s *SessionStateUseCase) nextTrackInStop(ctx context.Context, sessionID string) error {
_, err := s.sessionRepo.DoInTx(ctx, s.nextTrackInStopTx(sessionID))
if err != nil {
return fmt.Errorf("nextTrackInStop transaction: %w", err)
}

if err := session.GoNextTrack(); err != nil && errors.Is(err, entity.ErrSessionAllTracksFinished) {
s.timerUC.handleAllTrackFinish(session)
if err := s.sessionRepo.Update(ctx, session); err != nil {
return fmt.Errorf("update session id=%s: %w", session.ID, err)
return nil
}

func (s *SessionStateUseCase) nextTrackInStopTx(sessionID string) func(ctx context.Context) (interface{}, error) {
return func(ctx context.Context) (interface{}, error) {
session, err := s.sessionRepo.FindByIDForUpdate(ctx, sessionID)
if err != nil {
return nil, fmt.Errorf("find session :%w", err)
}
if !session.IsNextTrackExistWhenStateIsStop() {
return nil, fmt.Errorf("nextTrackInStop: %w", entity.ErrNextQueueTrackNotFound)
}
return nil
}

if err := s.sessionRepo.Update(ctx, session); err != nil {
return fmt.Errorf("update session id=%s: %w", session.ID, err)
}
if err := session.GoNextTrack(); err != nil && errors.Is(err, entity.ErrSessionAllTracksFinished) {
s.timerUC.handleAllTrackFinish(session)
if err := s.sessionRepo.Update(ctx, session); err != nil {
return nil, fmt.Errorf("update session id=%s: %w", session.ID, err)
}
return nil, nil
}

s.pusher.Push(&event.PushMessage{
SessionID: session.ID,
Msg: entity.NewEventNextTrack(session.QueueHead),
})
if err := s.sessionRepo.Update(ctx, session); err != nil {
return nil, fmt.Errorf("update session id=%s: %w", session.ID, err)
}

return nil
s.pusher.Push(&event.PushMessage{
SessionID: sessionID,
Msg: entity.NewEventNextTrack(session.QueueHead),
})

return nil, nil
}
}

// ChangeSessionState は与えられたセッションのstateを操作します。
Expand Down
Loading

0 comments on commit dea9cf9

Please sign in to comment.