diff --git a/src/backend/sources/MemorySource.ts b/src/backend/sources/MemorySource.ts index 07925bc..2eafba7 100644 --- a/src/backend/sources/MemorySource.ts +++ b/src/backend/sources/MemorySource.ts @@ -32,6 +32,8 @@ import AbstractSource from "./AbstractSource.js"; import { AbstractPlayerState, createPlayerOptions, PlayerStateOptions } from "./PlayerState/AbstractPlayerState.js"; import { GenericPlayerState } from "./PlayerState/GenericPlayerState.js"; +const EXPECTED_NON_DISCOVERED_REASON = 'not added because an identical play with the same timestamp was already discovered.'; + export default class MemorySource extends AbstractSource { playerSourceOfTruth: SOURCE_SOT_TYPES = SOURCE_SOT.PLAYER; @@ -46,38 +48,81 @@ export default class MemorySource extends AbstractSource { players: Map = new Map(); playerState: Map = new Map(); + playerCleanupDiscoveryAttempt: Map = new Map(); scheduler: ToadScheduler = new ToadScheduler(); constructor(type: SourceType, name: string, config: SourceConfig, internal: InternalConfig, emitter: EventEmitter) { super(type, name, config, internal, emitter); + + // player cleanup on *schedule* is needed when the Source is non-polling (ingress) + // because if the source stops sending updates then processRecentPlays() was never called so we never remove old players this.scheduler.addSimpleIntervalJob(new SimpleIntervalJob({seconds: 15}, new Task('Player Cleanup', () => { - this.cleanupPlayers(); + if(!this.canPoll) { + this.cleanupPlayers(); + } }))); } cleanupPlayers = () => { - const deadPlatformIds: [string, string?][] = []; - for (const [key, player] of this.players.entries()) { - // no communication from the source was received for this player - const isStale = player.checkStale(); - if (isStale && player.checkOrphaned() && player.isDead()) { - deadPlatformIds.push([player.platformIdStr, `Removed after being orphaned for ${dayjs.duration(player.stateIntervalOptions.orphanedInterval, 'seconds').asMinutes()} minutes`]); - } else if (isStale) { - const state = player.getApiState(); - const stateHash = objectHash.sha1(state); - if(stateHash !== this.playerState.get(key)) { - this.playerState.set(key, stateHash); - this.emitEvent('playerUpdate', state); + for (const key of this.players.keys()) { + this.cleanupPlayer(key); + } + } + + cleanupPlayer = (key: string): PlayObject | undefined => { + const player = this.players.get(key); + if(player === undefined) { + this.logger.warn({labels: 'Player Cleanup'},`No Player with ID ${key} exists! Cannot cleanup.`); + return; + } + let discoveredCleanupPlay: PlayObject | undefined; + let label = 'Player Cleanup', + deletePlayer = false; + // no communication from the source was received for this player + const isStale = player.checkStale(); + if (isStale && player.checkOrphaned() && player.isDead()) { + deletePlayer = true; + label = 'Dead Player Cleanup'; + } else if (isStale) { + label = 'Stale Player Cleanup'; + const state = player.getApiState(); + const stateHash = objectHash.sha1(state); + if(stateHash !== this.playerState.get(key)) { + this.playerState.set(key, stateHash); + this.emitEvent('playerUpdate', state); + } + if(this.config.options?.logPlayerState === true || isDebugMode()) { + player.logSummary(); + } + } else { + // player is not stale + this.playerCleanupDiscoveryAttempt.delete(key); + return; + } + + // player was stale or orphaned/dead + // if we haven't already tried to discover any in-progress plays then do it now (and only once) + if(!this.playerCleanupDiscoveryAttempt.has(key)) { + this.playerCleanupDiscoveryAttempt.set(key, true); + const cleanupPlay = player.getPlayedObject(); + let discoverablePlay: boolean; + if(cleanupPlay !== undefined) { + const [discoverable, discoverableReason] = this.isListenedPlayDiscoverable(cleanupPlay); + discoverablePlay = discoverable; + if(this.playerSourceOfTruth === SOURCE_SOT.PLAYER) { + player.logger.verbose({labels: label}, discoverableReason); } - if(this.config.options?.logPlayerState === true || isDebugMode()) { - player.logSummary(); + if(discoverable) { + discoveredCleanupPlay = cleanupPlay; } } } - for (const [deadId, reason] of deadPlatformIds) { - this.deletePlayer(deadId, reason); + if(deletePlayer) { + this.deletePlayer(player.platformIdStr, `Removed after being orphaned for ${dayjs.duration(player.stateIntervalOptions.orphanedInterval, 'seconds').asMinutes()} minutes`); } + + return discoveredCleanupPlay; } playersToObject = (): Record => { @@ -170,6 +215,9 @@ export default class MemorySource extends AbstractSource { if (relevantDatas.length > 0) { this.lastActivityAt = dayjs(); + // reset any player cleanup state since we got fresh data + this.playerCleanupDiscoveryAttempt.delete(key); + incomingData = this.pickPlatformSession(relevantDatas, player); let playerState: PlayerStateDataMaybePlay; @@ -189,41 +237,14 @@ export default class MemorySource extends AbstractSource { // wait to discover play until it is stale or current play has changed // so that our discovered track has an accurate "listenedFor" count if (candidate !== undefined && (playChanged || player.isUpdateStale())) { - const stPrefix = `${buildTrackString(candidate, {include: ['trackId', 'artist', 'track']})}`; - const thresholdResults = timePassesScrobbleThreshold(scrobbleThresholds, candidate.data.listenedFor, candidate.data.duration); - - if (thresholdResults.passes) { - const matchingRecent = this.existingDiscovered(candidate); //sRecentlyPlayed.find(x => playObjDataMatch(x, candidate)); - if (matchingRecent === undefined) { - if(this.playerSourceOfTruth === SOURCE_SOT.PLAYER) { - player.logger.verbose(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and not matching any prior plays`); - } - newStatefulPlays.push(candidate); - } else { - const {data: {playDate, duration}} = candidate; - const {data: {playDate: rplayDate}} = matchingRecent; - if (!playDate.isSame(rplayDate)) { - if (duration !== undefined) { - if (playDate.isAfter(rplayDate.add(duration, 's'))) { - if(this.playerSourceOfTruth === SOURCE_SOT.PLAYER) { - player.logger.verbose(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and having a different timestamp than a prior play`); - } - newStatefulPlays.push(candidate); - } - } else { - const discoveredPlays = this.getRecentlyDiscoveredPlaysByPlatform(genGroupId(candidate)); - if (discoveredPlays.length === 0 || !playObjDataMatch(discoveredPlays[0], candidate)) { - // if most recent stateful play is not this track we'll add it - if(this.playerSourceOfTruth === SOURCE_SOT.PLAYER) { - player.logger.verbose(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)}. Matched other recent play but could not determine time frame due to missing duration. Allowed due to not being last played track.`); - } - newStatefulPlays.push(candidate); - } - } - } + const [discoverable, discoverableReason] = this.isListenedPlayDiscoverable(candidate); + if(discoverable) { + if(this.playerSourceOfTruth === SOURCE_SOT.PLAYER) { + player.logger.verbose(discoverableReason); } + newStatefulPlays.push(candidate) } else if(playChanged && this.playerSourceOfTruth === SOURCE_SOT.PLAYER) { - player.logger.verbose(`${stPrefix} not added because ${thresholdResultSummary(thresholdResults)}.`); + player.logger.verbose(discoverableReason); } } @@ -233,12 +254,57 @@ export default class MemorySource extends AbstractSource { const apiState = player.getApiState(); this.playerState.set(key, objectHash.sha1(apiState)) this.emitEvent('playerUpdate', apiState); + } else { + const playFromCleanup = this.cleanupPlayer(key); + if(playFromCleanup !== undefined) { + newStatefulPlays.push(playFromCleanup); + } } } return newStatefulPlays; } + protected isListenedPlayDiscoverable = (candidate: PlayObject): [boolean, string] => { + + const { + options: { + scrobbleThresholds = {} + } + } = this.config; + + const stPrefix = `${buildTrackString(candidate, {include: ['trackId', 'artist', 'track']})}`; + const thresholdResults = timePassesScrobbleThreshold(scrobbleThresholds, candidate.data.listenedFor, candidate.data.duration); + + if (thresholdResults.passes) { + const matchingRecent = this.existingDiscovered(candidate); //sRecentlyPlayed.find(x => playObjDataMatch(x, candidate)); + if (matchingRecent === undefined) { + return [true,`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and not matching any prior plays`]; + } else { + const {data: {playDate, duration}} = candidate; + const {data: {playDate: rplayDate}} = matchingRecent; + if (!playDate.isSame(rplayDate)) { + if (duration !== undefined) { + if (playDate.isAfter(rplayDate.add(duration, 's'))) { + return [true,`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and having a different timestamp than a prior play`]; + } + return [false, `${stPrefix} ${EXPECTED_NON_DISCOVERED_REASON}`] + } else { + const discoveredPlays = this.getRecentlyDiscoveredPlaysByPlatform(genGroupId(candidate)); + if (discoveredPlays.length === 0 || !playObjDataMatch(discoveredPlays[0], candidate)) { + // if most recent stateful play is not this track we'll add it + return [true,`${stPrefix} added after ${thresholdResultSummary(thresholdResults)}. Matched other recent play but could not determine time frame due to missing duration. Allowed due to not being last played track.`]; + } + return [false, `${stPrefix} not added because it matched the last discovered play and could not determine time frame of play`]; + } + } else { + return [false, `${stPrefix} ${EXPECTED_NON_DISCOVERED_REASON}`] + } + } + } + return [false,`${stPrefix} not added because ${thresholdResultSummary(thresholdResults)}.`]; + } + recentlyPlayedTrackIsValid = (playObj: any) => playObj.data.playDate.isBefore(dayjs().subtract(30, 's')) protected getInterval(): number { diff --git a/src/backend/sources/PlayerState/PlexPlayerState.ts b/src/backend/sources/PlayerState/PlexPlayerState.ts index 655768a..6131869 100644 --- a/src/backend/sources/PlayerState/PlexPlayerState.ts +++ b/src/backend/sources/PlayerState/PlexPlayerState.ts @@ -7,7 +7,7 @@ import { PositionalPlayerState } from "./PositionalPlayerState.js"; export class PlexPlayerState extends PositionalPlayerState { constructor(logger: Logger, platformId: PlayPlatformId, opts?: PlayerStateOptions) { super(logger, platformId, {allowedDrift: 17000, rtTruth: true, ...(opts || {})}); - + this.gracefulEndBuffer = this.allowedDrift / 1000; } protected isSessionStillPlaying(position: number): boolean { diff --git a/src/backend/sources/PlayerState/PositionalPlayerState.ts b/src/backend/sources/PlayerState/PositionalPlayerState.ts index e71fdcb..a716dd5 100644 --- a/src/backend/sources/PlayerState/PositionalPlayerState.ts +++ b/src/backend/sources/PlayerState/PositionalPlayerState.ts @@ -12,6 +12,7 @@ export class PositionalPlayerState extends AbstractPlayerState { protected allowedDrift: number; protected rtTruth: boolean; + protected gracefulEndBuffer: number = 3; declare currentListenRange?: ListenRangePositional; declare listenRanges: ListenRangePositional[]; @@ -83,13 +84,13 @@ export class PositionalPlayerState extends AbstractPlayerState { if (this.currentListenRange !== undefined && this.currentListenRange.getDuration() !== 0) { this.logger.debug('Ended current Player listen range.') let finalPosition: number; - if(this.calculatedStatus === CALCULATED_PLAYER_STATUSES.playing && !this.currentListenRange.isInitial()) { + if([CALCULATED_PLAYER_STATUSES.playing, CALCULATED_PLAYER_STATUSES.stale].includes(this.calculatedStatus) && !this.currentListenRange.isInitial()) { const { data: { duration, } = {} } = this.currentPlay; - if(duration !== undefined && (duration - this.currentListenRange.end.position) < 3) { + if(duration !== undefined && (duration - this.currentListenRange.end.position) < this.gracefulEndBuffer) { // likely the track was listened to until it ended // but polling interval or network delays caused MS to not get data on the very end // also...within 3 seconds of ending is close enough to call this complete IMO diff --git a/src/backend/sources/PlexApiSource.ts b/src/backend/sources/PlexApiSource.ts index d176883..923efdf 100644 --- a/src/backend/sources/PlexApiSource.ts +++ b/src/backend/sources/PlexApiSource.ts @@ -408,6 +408,10 @@ export default class PlexApiSource extends MemoryPositionalSource { sessionToPlayerState = (obj: GetSessionsMetadata): PlayerStateDataMaybePlay => { const { + // represents milliseconds position of player + // * only available when player is PLAYING (not paused) + // * when user seeks or pauses/stops -> plays the initial position is accurate + // * when playing, is only updated every 15 seconds from the position of play viewOffset, player: { machineIdentifier,