Skip to content

Commit

Permalink
Merge pull request #249 from FoxxMD/GH-223/plexPlatformCleanup
Browse files Browse the repository at this point in the history
fix(plex): Improve handling of orphaned API sessions
  • Loading branch information
FoxxMD authored Jan 14, 2025
2 parents 34289f0 + 30ee000 commit a879b7a
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 53 deletions.
166 changes: 116 additions & 50 deletions src/backend/sources/MemorySource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import AbstractSource from "./AbstractSource.js";
import { AbstractPlayerState, createPlayerOptions, PlayerStateOptions } from "./PlayerState/AbstractPlayerState.js";
import { GenericPlayerState } from "./PlayerState/GenericPlayerState.js";

const EXPECTED_NON_DISCOVERED_REASON = 'not added because an identical play with the same timestamp was already discovered.';

export default class MemorySource extends AbstractSource {

playerSourceOfTruth: SOURCE_SOT_TYPES = SOURCE_SOT.PLAYER;
Expand All @@ -46,38 +48,81 @@ export default class MemorySource extends AbstractSource {

players: Map<string, AbstractPlayerState> = new Map();
playerState: Map<string, string> = new Map();
playerCleanupDiscoveryAttempt: Map<string, boolean> = new Map();

scheduler: ToadScheduler = new ToadScheduler();

constructor(type: SourceType, name: string, config: SourceConfig, internal: InternalConfig, emitter: EventEmitter) {
super(type, name, config, internal, emitter);

// player cleanup on *schedule* is needed when the Source is non-polling (ingress)
// because if the source stops sending updates then processRecentPlays() was never called so we never remove old players
this.scheduler.addSimpleIntervalJob(new SimpleIntervalJob({seconds: 15}, new Task('Player Cleanup', () => {
this.cleanupPlayers();
if(!this.canPoll) {
this.cleanupPlayers();
}
})));
}

cleanupPlayers = () => {
const deadPlatformIds: [string, string?][] = [];
for (const [key, player] of this.players.entries()) {
// no communication from the source was received for this player
const isStale = player.checkStale();
if (isStale && player.checkOrphaned() && player.isDead()) {
deadPlatformIds.push([player.platformIdStr, `Removed after being orphaned for ${dayjs.duration(player.stateIntervalOptions.orphanedInterval, 'seconds').asMinutes()} minutes`]);
} else if (isStale) {
const state = player.getApiState();
const stateHash = objectHash.sha1(state);
if(stateHash !== this.playerState.get(key)) {
this.playerState.set(key, stateHash);
this.emitEvent('playerUpdate', state);
for (const key of this.players.keys()) {
this.cleanupPlayer(key);
}
}

cleanupPlayer = (key: string): PlayObject | undefined => {
const player = this.players.get(key);
if(player === undefined) {
this.logger.warn({labels: 'Player Cleanup'},`No Player with ID ${key} exists! Cannot cleanup.`);
return;
}
let discoveredCleanupPlay: PlayObject | undefined;
let label = 'Player Cleanup',
deletePlayer = false;
// no communication from the source was received for this player
const isStale = player.checkStale();
if (isStale && player.checkOrphaned() && player.isDead()) {
deletePlayer = true;
label = 'Dead Player Cleanup';
} else if (isStale) {
label = 'Stale Player Cleanup';
const state = player.getApiState();
const stateHash = objectHash.sha1(state);
if(stateHash !== this.playerState.get(key)) {
this.playerState.set(key, stateHash);
this.emitEvent('playerUpdate', state);
}
if(this.config.options?.logPlayerState === true || isDebugMode()) {
player.logSummary();
}
} else {
// player is not stale
this.playerCleanupDiscoveryAttempt.delete(key);
return;
}

// player was stale or orphaned/dead
// if we haven't already tried to discover any in-progress plays then do it now (and only once)
if(!this.playerCleanupDiscoveryAttempt.has(key)) {
this.playerCleanupDiscoveryAttempt.set(key, true);
const cleanupPlay = player.getPlayedObject();
let discoverablePlay: boolean;
if(cleanupPlay !== undefined) {
const [discoverable, discoverableReason] = this.isListenedPlayDiscoverable(cleanupPlay);
discoverablePlay = discoverable;
if(this.playerSourceOfTruth === SOURCE_SOT.PLAYER) {
player.logger.verbose({labels: label}, discoverableReason);
}
if(this.config.options?.logPlayerState === true || isDebugMode()) {
player.logSummary();
if(discoverable) {
discoveredCleanupPlay = cleanupPlay;
}
}
}
for (const [deadId, reason] of deadPlatformIds) {
this.deletePlayer(deadId, reason);
if(deletePlayer) {
this.deletePlayer(player.platformIdStr, `Removed after being orphaned for ${dayjs.duration(player.stateIntervalOptions.orphanedInterval, 'seconds').asMinutes()} minutes`);
}

return discoveredCleanupPlay;
}

playersToObject = (): Record<string, SourcePlayerObj> => {
Expand Down Expand Up @@ -170,6 +215,9 @@ export default class MemorySource extends AbstractSource {
if (relevantDatas.length > 0) {
this.lastActivityAt = dayjs();

// reset any player cleanup state since we got fresh data
this.playerCleanupDiscoveryAttempt.delete(key);

incomingData = this.pickPlatformSession(relevantDatas, player);

let playerState: PlayerStateDataMaybePlay;
Expand All @@ -189,41 +237,14 @@ export default class MemorySource extends AbstractSource {
// wait to discover play until it is stale or current play has changed
// so that our discovered track has an accurate "listenedFor" count
if (candidate !== undefined && (playChanged || player.isUpdateStale())) {
const stPrefix = `${buildTrackString(candidate, {include: ['trackId', 'artist', 'track']})}`;
const thresholdResults = timePassesScrobbleThreshold(scrobbleThresholds, candidate.data.listenedFor, candidate.data.duration);

if (thresholdResults.passes) {
const matchingRecent = this.existingDiscovered(candidate); //sRecentlyPlayed.find(x => playObjDataMatch(x, candidate));
if (matchingRecent === undefined) {
if(this.playerSourceOfTruth === SOURCE_SOT.PLAYER) {
player.logger.verbose(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and not matching any prior plays`);
}
newStatefulPlays.push(candidate);
} else {
const {data: {playDate, duration}} = candidate;
const {data: {playDate: rplayDate}} = matchingRecent;
if (!playDate.isSame(rplayDate)) {
if (duration !== undefined) {
if (playDate.isAfter(rplayDate.add(duration, 's'))) {
if(this.playerSourceOfTruth === SOURCE_SOT.PLAYER) {
player.logger.verbose(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and having a different timestamp than a prior play`);
}
newStatefulPlays.push(candidate);
}
} else {
const discoveredPlays = this.getRecentlyDiscoveredPlaysByPlatform(genGroupId(candidate));
if (discoveredPlays.length === 0 || !playObjDataMatch(discoveredPlays[0], candidate)) {
// if most recent stateful play is not this track we'll add it
if(this.playerSourceOfTruth === SOURCE_SOT.PLAYER) {
player.logger.verbose(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)}. Matched other recent play but could not determine time frame due to missing duration. Allowed due to not being last played track.`);
}
newStatefulPlays.push(candidate);
}
}
}
const [discoverable, discoverableReason] = this.isListenedPlayDiscoverable(candidate);
if(discoverable) {
if(this.playerSourceOfTruth === SOURCE_SOT.PLAYER) {
player.logger.verbose(discoverableReason);
}
newStatefulPlays.push(candidate)
} else if(playChanged && this.playerSourceOfTruth === SOURCE_SOT.PLAYER) {
player.logger.verbose(`${stPrefix} not added because ${thresholdResultSummary(thresholdResults)}.`);
player.logger.verbose(discoverableReason);
}
}

Expand All @@ -233,12 +254,57 @@ export default class MemorySource extends AbstractSource {
const apiState = player.getApiState();
this.playerState.set(key, objectHash.sha1(apiState))
this.emitEvent('playerUpdate', apiState);
} else {
const playFromCleanup = this.cleanupPlayer(key);
if(playFromCleanup !== undefined) {
newStatefulPlays.push(playFromCleanup);
}
}
}

return newStatefulPlays;
}

protected isListenedPlayDiscoverable = (candidate: PlayObject): [boolean, string] => {

const {
options: {
scrobbleThresholds = {}
}
} = this.config;

const stPrefix = `${buildTrackString(candidate, {include: ['trackId', 'artist', 'track']})}`;
const thresholdResults = timePassesScrobbleThreshold(scrobbleThresholds, candidate.data.listenedFor, candidate.data.duration);

if (thresholdResults.passes) {
const matchingRecent = this.existingDiscovered(candidate); //sRecentlyPlayed.find(x => playObjDataMatch(x, candidate));
if (matchingRecent === undefined) {
return [true,`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and not matching any prior plays`];
} else {
const {data: {playDate, duration}} = candidate;
const {data: {playDate: rplayDate}} = matchingRecent;
if (!playDate.isSame(rplayDate)) {
if (duration !== undefined) {
if (playDate.isAfter(rplayDate.add(duration, 's'))) {
return [true,`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and having a different timestamp than a prior play`];
}
return [false, `${stPrefix} ${EXPECTED_NON_DISCOVERED_REASON}`]
} else {
const discoveredPlays = this.getRecentlyDiscoveredPlaysByPlatform(genGroupId(candidate));
if (discoveredPlays.length === 0 || !playObjDataMatch(discoveredPlays[0], candidate)) {
// if most recent stateful play is not this track we'll add it
return [true,`${stPrefix} added after ${thresholdResultSummary(thresholdResults)}. Matched other recent play but could not determine time frame due to missing duration. Allowed due to not being last played track.`];
}
return [false, `${stPrefix} not added because it matched the last discovered play and could not determine time frame of play`];
}
} else {
return [false, `${stPrefix} ${EXPECTED_NON_DISCOVERED_REASON}`]
}
}
}
return [false,`${stPrefix} not added because ${thresholdResultSummary(thresholdResults)}.`];
}

recentlyPlayedTrackIsValid = (playObj: any) => playObj.data.playDate.isBefore(dayjs().subtract(30, 's'))

protected getInterval(): number {
Expand Down
2 changes: 1 addition & 1 deletion src/backend/sources/PlayerState/PlexPlayerState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { PositionalPlayerState } from "./PositionalPlayerState.js";
export class PlexPlayerState extends PositionalPlayerState {
constructor(logger: Logger, platformId: PlayPlatformId, opts?: PlayerStateOptions) {
super(logger, platformId, {allowedDrift: 17000, rtTruth: true, ...(opts || {})});

this.gracefulEndBuffer = this.allowedDrift / 1000;
}

protected isSessionStillPlaying(position: number): boolean {
Expand Down
5 changes: 3 additions & 2 deletions src/backend/sources/PlayerState/PositionalPlayerState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export class PositionalPlayerState extends AbstractPlayerState {

protected allowedDrift: number;
protected rtTruth: boolean;
protected gracefulEndBuffer: number = 3;

declare currentListenRange?: ListenRangePositional;
declare listenRanges: ListenRangePositional[];
Expand Down Expand Up @@ -83,13 +84,13 @@ export class PositionalPlayerState extends AbstractPlayerState {
if (this.currentListenRange !== undefined && this.currentListenRange.getDuration() !== 0) {
this.logger.debug('Ended current Player listen range.')
let finalPosition: number;
if(this.calculatedStatus === CALCULATED_PLAYER_STATUSES.playing && !this.currentListenRange.isInitial()) {
if([CALCULATED_PLAYER_STATUSES.playing, CALCULATED_PLAYER_STATUSES.stale].includes(this.calculatedStatus) && !this.currentListenRange.isInitial()) {
const {
data: {
duration,
} = {}
} = this.currentPlay;
if(duration !== undefined && (duration - this.currentListenRange.end.position) < 3) {
if(duration !== undefined && (duration - this.currentListenRange.end.position) < this.gracefulEndBuffer) {
// likely the track was listened to until it ended
// but polling interval or network delays caused MS to not get data on the very end
// also...within 3 seconds of ending is close enough to call this complete IMO
Expand Down
4 changes: 4 additions & 0 deletions src/backend/sources/PlexApiSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ export default class PlexApiSource extends MemoryPositionalSource {
sessionToPlayerState = (obj: GetSessionsMetadata): PlayerStateDataMaybePlay => {

const {
// represents milliseconds position of player
// * only available when player is PLAYING (not paused)
// * when user seeks or pauses/stops -> plays the initial position is accurate
// * when playing, is only updated every 15 seconds from the position of play
viewOffset,
player: {
machineIdentifier,
Expand Down

0 comments on commit a879b7a

Please sign in to comment.