Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(LSG): expose parts of the current segment #1388

Open
wants to merge 7 commits into
base: release52
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion packages/live-status-gateway/api/schemas/activePlaylist.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,32 @@ $defs:
- $ref: '#/$defs/piece/examples/0'
publicData:
partType: 'intro'
currentSegmentPart:
type: object
properties:
id:
description: Unique id of the part
type: string
name:
description: User-presentable name of the part
type: string
autoNext:
description: If this part will progress to the next automatically
type: boolean
default: false
timing:
type: object
properties:
expectedDurationMs:
description: Expected duration of the part
type: number
required: [id, name, timing]
additionalProperties: false
examples:
- id: 'H5CBGYjThrMSmaYvRaa5FVKJIzk_'
name: 'Intro'
timing:
expectedDurationMs: 15000
part:
oneOf:
- $ref: '#/$defs/partBase'
Expand Down Expand Up @@ -196,7 +222,11 @@ $defs:
- part_expected_duration
- segment_budget_duration
required: [expectedDurationMs, projectedEndTime]
required: [id, timing]
parts:
type: array
items:
$ref: '#/$defs/currentSegmentPart'
required: [id, timing, parts]
additionalProperties: false
examples:
- id: 'H5CBGYjThrMSmaYvRaa5FVKJIzk_'
Expand All @@ -205,6 +235,8 @@ $defs:
budgetDurationMs: 20000
projectedEndTime: 1600000075000
countdownType: segment_budget_duration
parts:
- $ref: '#/$defs/currentSegmentPart/examples/0'
piece:
type: object
properties:
Expand Down
128 changes: 128 additions & 0 deletions packages/live-status-gateway/src/collectionBase.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { CorelibPubSubCollections, CorelibPubSubTypes } from '@sofie-automation/corelib/dist/pubsub'
import {
StudioId,
CoreConnection,
ProtectedString,
Collection as CoreCollection,
CollectionDocCheck,
} from '@sofie-automation/server-core-integration'
import throttleToNextTick from '@sofie-automation/shared-lib/dist/lib/throttleToNextTick'
import * as _ from 'underscore'
import { Logger } from 'winston'
import { CoreHandler } from './coreHandler'
import { arePropertiesShallowEqual } from './helpers/equality'
import { CollectionHandlers } from './liveStatusServer'

export type ObserverCallback<T, K extends keyof T> = (data: Pick<T, K> | undefined) => void

export const DEFAULT_THROTTLE_PERIOD_MS = 20

export abstract class CollectionBase<T, TCollection extends keyof CorelibPubSubCollections> {
protected _name: string
protected _collectionName: TCollection
protected _logger: Logger
protected _coreHandler: CoreHandler
protected _studioId!: StudioId
protected _observers: Map<
ObserverCallback<T, keyof T>,
{ keysToPick: readonly (keyof T)[] | undefined; lastData: T | undefined }
> = new Map()
protected _collectionData: T | undefined

protected get _core(): CoreConnection<CorelibPubSubTypes, CorelibPubSubCollections> {
return this._coreHandler.core
}
protected throttledChanged: () => void

constructor(
collection: TCollection,
logger: Logger,
coreHandler: CoreHandler,
throttlePeriodMs = DEFAULT_THROTTLE_PERIOD_MS
) {
this._name = this.constructor.name
this._collectionName = collection
this._logger = logger
this._coreHandler = coreHandler

this.throttledChanged = throttleToNextTick(
throttlePeriodMs > 0
? _.throttle(() => this.changed(), throttlePeriodMs, { leading: true, trailing: true })
: () => this.changed()
)

this._logger.info(`Starting ${this._name} handler`)
}

init(_handlers: CollectionHandlers): void {
if (!this._coreHandler.studioId) throw new Error('StudioId is not defined')
this._studioId = this._coreHandler.studioId
}

close(): void {
this._logger.info(`Closing ${this._name} handler`)
}

subscribe<K extends keyof T>(callback: ObserverCallback<T, K>, keysToPick?: readonly K[]): void {
//this._logger.info(`${name}' added observer for '${this._name}'`)
if (this._collectionData) callback(this._collectionData)
this._observers.set(callback, { keysToPick, lastData: this.shallowClone(this._collectionData) })
}

/**
* Called after a batch of updates to documents in the collection
*/
protected changed(): void {
// override me
}

notify(data: T | undefined): void {
for (const [observer, o] of this._observers) {
if (
!o.lastData ||
!o.keysToPick ||
!data ||
!arePropertiesShallowEqual(o.lastData, data, undefined, o.keysToPick)
) {
observer(data)
o.lastData = this.shallowClone(data)
}
}
}

protected shallowClone(data: T | undefined): T | undefined {
if (data === undefined) return undefined
if (Array.isArray(data)) return [...data] as T
if (typeof data === 'object') return { ...data }
return data
}

protected logDocumentChange(documentId: string | ProtectedString<any>, changeType: string): void {
this._logger.silly(`${this._name} ${changeType} ${documentId}`)
}

protected logUpdateReceived(collectionName: string, updateCount: number | undefined): void
protected logUpdateReceived(collectionName: string, extraInfo?: string): void
protected logUpdateReceived(
collectionName: string,
extraInfoOrUpdateCount: string | number | undefined | null = null
): void {
let message = `${this._name} received ${collectionName} update`
if (typeof extraInfoOrUpdateCount === 'string') {
message += `, ${extraInfoOrUpdateCount}`
} else if (extraInfoOrUpdateCount !== null) {
message += `(${extraInfoOrUpdateCount})`
}
this._logger.debug(message)
}

protected logNotifyingUpdate(updateCount: number | undefined): void {
this._logger.debug(`${this._name} notifying update with ${updateCount} ${this._collectionName}`)
}

protected getCollectionOrFail(): CoreCollection<CollectionDocCheck<CorelibPubSubCollections[TCollection]>> {
const collection = this._core.getCollection<TCollection>(this._collectionName)
if (!collection) throw new Error(`collection '${this._collectionName}' not found!`)
return collection
}
}
Original file line number Diff line number Diff line change
@@ -1,79 +1,11 @@
import { Logger } from 'winston'
import { CoreHandler } from '../coreHandler'
import { CollectionBase, Collection, CollectionObserver } from '../wsHandler'
import { AdLibAction } from '@sofie-automation/corelib/dist/dataModel/AdlibAction'
import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections'
import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub'
import { AdLibActionId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { SelectedPartInstances } from './partInstancesHandler'

export class AdLibActionsHandler
extends CollectionBase<AdLibAction[], CorelibPubSub.adLibActions, CollectionName.AdLibActions>
implements Collection<AdLibAction[]>, CollectionObserver<SelectedPartInstances>
{
public observerName: string
private _curRundownId: RundownId | undefined
private _curPartInstance: DBPartInstance | undefined
import { RundownContentHandlerBase } from './rundownContentHandlerBase'

export class AdLibActionsHandler extends RundownContentHandlerBase<CorelibPubSub.adLibActions> {
constructor(logger: Logger, coreHandler: CoreHandler) {
super(AdLibActionsHandler.name, CollectionName.AdLibActions, CorelibPubSub.adLibActions, logger, coreHandler)
this.observerName = this._name
}

async changed(id: AdLibActionId, changeType: string): Promise<void> {
this.logDocumentChange(id, changeType)
if (!this._collectionName) return
const col = this._core.getCollection(this._collectionName)
if (!col) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = col.find({ rundownId: this._curRundownId })
await this.notify(this._collectionData)
}

async update(source: string, data: SelectedPartInstances | undefined): Promise<void> {
this.logUpdateReceived('partInstances', source)
const prevRundownId = this._curRundownId
this._curPartInstance = data ? data.current ?? data.next : undefined
this._curRundownId = this._curPartInstance ? this._curPartInstance.rundownId : undefined

await new Promise(process.nextTick.bind(this))
if (!this._collectionName) return
if (!this._publicationName) return
if (prevRundownId !== this._curRundownId) {
if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId)
if (this._dbObserver) this._dbObserver.stop()
if (this._curRundownId && this._curPartInstance) {
this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [
this._curRundownId,
])
this._dbObserver = this._coreHandler.setupObserver(this._collectionName)
this._dbObserver.added = (id) => {
void this.changed(id, 'added').catch(this._logger.error)
}
this._dbObserver.changed = (id) => {
void this.changed(id, 'changed').catch(this._logger.error)
}
this._dbObserver.removed = (id) => {
void this.changed(id, 'removed').catch(this._logger.error)
}

const collection = this._core.getCollection(this._collectionName)
if (!collection) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = collection.find({
rundownId: this._curRundownId,
})
await this.notify(this._collectionData)
}
}
}

// override notify to implement empty array handling
async notify(data: AdLibAction[] | undefined): Promise<void> {
this.logNotifyingUpdate(data?.length)
if (data !== undefined) {
for (const observer of this._observers) {
await observer.update(this._name, data)
}
}
super(CollectionName.AdLibActions, CorelibPubSub.adLibActions, logger, coreHandler)
}
}
75 changes: 3 additions & 72 deletions packages/live-status-gateway/src/collections/adLibsHandler.ts
Original file line number Diff line number Diff line change
@@ -1,80 +1,11 @@
import { Logger } from 'winston'
import { CoreHandler } from '../coreHandler'
import { CollectionBase, Collection, CollectionObserver } from '../wsHandler'
import { AdLibPiece } from '@sofie-automation/corelib/dist/dataModel/AdLibPiece'
import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections'
import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub'
import { PieceId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { SelectedPartInstances } from './partInstancesHandler'

export class AdLibsHandler
extends CollectionBase<AdLibPiece[], CorelibPubSub.adLibPieces, CollectionName.AdLibPieces>
implements Collection<AdLibPiece[]>, CollectionObserver<SelectedPartInstances>
{
public observerName: string
// private _core: CoreConnection
private _currentRundownId: RundownId | undefined
private _currentPartInstance: DBPartInstance | undefined
import { RundownContentHandlerBase } from './rundownContentHandlerBase'

export class AdLibsHandler extends RundownContentHandlerBase<CorelibPubSub.adLibPieces> {
constructor(logger: Logger, coreHandler: CoreHandler) {
super(AdLibsHandler.name, CollectionName.AdLibPieces, CorelibPubSub.adLibPieces, logger, coreHandler)
this.observerName = this._name
}

async changed(id: PieceId, changeType: string): Promise<void> {
this.logDocumentChange(id, changeType)
if (!this._collectionName) return
const col = this._core.getCollection(this._collectionName)
if (!col) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = col.find({ rundownId: this._currentRundownId })
await this.notify(this._collectionData)
}

async update(source: string, data: SelectedPartInstances | undefined): Promise<void> {
this.logUpdateReceived('partInstances', source)
const prevRundownId = this._currentRundownId
this._currentPartInstance = data ? data.current ?? data.next : undefined
this._currentRundownId = this._currentPartInstance?.rundownId

await new Promise(process.nextTick.bind(this))
if (!this._collectionName) return
if (!this._publicationName) return
if (prevRundownId !== this._currentRundownId) {
if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId)
if (this._dbObserver) this._dbObserver.stop()
if (this._currentRundownId && this._currentPartInstance) {
this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [
this._currentRundownId,
])
this._dbObserver = this._coreHandler.setupObserver(this._collectionName)
this._dbObserver.added = (id) => {
void this.changed(id, 'added').catch(this._logger.error)
}
this._dbObserver.changed = (id) => {
void this.changed(id, 'changed').catch(this._logger.error)
}
this._dbObserver.removed = (id) => {
void this.changed(id, 'removed').catch(this._logger.error)
}

const collection = this._core.getCollection(this._collectionName)
if (!collection) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = collection.find({
rundownId: this._currentRundownId,
})
await this.notify(this._collectionData)
}
}
}

// override notify to implement empty array handling
async notify(data: AdLibPiece[] | undefined): Promise<void> {
this.logNotifyingUpdate(data?.length)
if (data !== undefined) {
for (const observer of this._observers) {
await observer.update(this._name, data)
}
}
super(CollectionName.AdLibPieces, CorelibPubSub.adLibPieces, logger, coreHandler)
}
}
Loading
Loading