diff --git a/.eslintrc.common.cjs b/.eslintrc.common.cjs index bbfa562d..275db296 100644 --- a/.eslintrc.common.cjs +++ b/.eslintrc.common.cjs @@ -26,5 +26,6 @@ module.exports = { tsx: "never", }, ], + "@typescript-eslint/no-unused-vars": ["warn", { argsIgnorePattern: "^_" }], }, }; diff --git a/packages/p2p-media-loader-core/src/core.ts b/packages/p2p-media-loader-core/src/core.ts index 5b683354..54aad8a8 100644 --- a/packages/p2p-media-loader-core/src/core.ts +++ b/packages/p2p-media-loader-core/src/core.ts @@ -15,7 +15,7 @@ import { import { BandwidthCalculators, StreamDetails } from "./internal-types.js"; import * as StreamUtils from "./utils/stream.js"; import { BandwidthCalculator } from "./bandwidth-calculator.js"; -import { SegmentsMemoryStorage } from "./segments-storage.js"; +import { SegmentMemoryStorage } from "./segment-storage/segment-memory-storage.js"; import { EventTarget } from "./utils/event-target.js"; import { overrideConfig, @@ -24,13 +24,14 @@ import { filterUndefinedProps, } from "./utils/utils.js"; import { TRACKER_CLIENT_VERSION_PREFIX } from "./utils/peer.js"; +import { SegmentStorage } from "./segment-storage/index.js"; /** Core class for managing media streams loading via P2P. */ export class Core { /** Default configuration for common core settings. */ static readonly DEFAULT_COMMON_CORE_CONFIG: CommonCoreConfig = { - cachedSegmentExpiration: undefined, - cachedSegmentsCount: 0, + segmentMemoryStorageLimit: undefined, + customSegmentStorageFactory: undefined, }; /** Default configuration for stream settings. */ @@ -74,7 +75,7 @@ export class Core { all: new BandwidthCalculator(), http: new BandwidthCalculator(), }; - private segmentStorage?: SegmentsMemoryStorage; + private segmentStorage?: SegmentStorage; private mainStreamLoader?: HybridLoader; private secondaryStreamLoader?: HybridLoader; private streamDetails: StreamDetails = { @@ -280,10 +281,7 @@ export class Core { throw new Error("Manifest response url is not defined"); } - if (!this.segmentStorage) { - this.segmentStorage = new SegmentsMemoryStorage(this.commonCoreConfig); - await this.segmentStorage.initialize(); - } + await this.initializeSegmentStorage(); const segment = this.identifySegment(segmentRuntimeId); @@ -327,7 +325,7 @@ export class Core { } /** - * Updates the 'isLive' status of the stream. + * Updates the 'isLive' status of the stream * * @param isLive - Boolean indicating whether the stream is live. */ @@ -372,7 +370,7 @@ export class Core { this.streams.clear(); this.mainStreamLoader?.destroy(); this.secondaryStreamLoader?.destroy(); - void this.segmentStorage?.destroy(); + this.segmentStorage?.destroy(); this.mainStreamLoader = undefined; this.secondaryStreamLoader = undefined; this.segmentStorage = undefined; @@ -380,6 +378,30 @@ export class Core { this.streamDetails = { isLive: false, activeLevelBitrate: 0 }; } + private async initializeSegmentStorage() { + if (this.segmentStorage) return; + + const isLive = this.streamDetails.isLive; + const createCustomStorage = + this.commonCoreConfig.customSegmentStorageFactory; + + if (createCustomStorage && typeof createCustomStorage !== "function") { + throw new Error("Storage configuration is invalid"); + } + + const segmentStorage = createCustomStorage + ? createCustomStorage(isLive) + : new SegmentMemoryStorage(); + + await segmentStorage.initialize( + this.commonCoreConfig, + this.mainStreamConfig, + this.secondaryStreamConfig, + ); + + this.segmentStorage = segmentStorage; + } + private identifySegment(segmentRuntimeId: string): SegmentWithStream { if (!this.manifestResponseUrl) { throw new Error("Manifest response url is undefined"); @@ -439,7 +461,7 @@ export class Core { throw new Error("Manifest response url is not defined"); } - if (!this.segmentStorage?.isInitialized) { + if (!this.segmentStorage) { throw new Error("Segment storage is not initialized"); } diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index 2eff10a4..069a1bbb 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -1,5 +1,4 @@ import { HttpRequestExecutor } from "./http-loader.js"; -import { SegmentsMemoryStorage } from "./segments-storage.js"; import { CoreEventMap, EngineCallbacks, @@ -22,6 +21,7 @@ import * as Utils from "./utils/utils.js"; import debug from "debug"; import { QueueItem } from "./utils/queue.js"; import { EventTarget } from "./utils/event-target.js"; +import { SegmentStorage } from "./segment-storage/index.js"; const FAILED_ATTEMPTS_CLEAR_INTERVAL = 60000; const PEER_UPDATE_LATENCY = 1000; @@ -45,7 +45,7 @@ export class HybridLoader { private readonly streamDetails: Required>, private readonly config: StreamConfig, private readonly bandwidthCalculators: BandwidthCalculators, - private readonly segmentStorage: SegmentsMemoryStorage, + private readonly segmentStorage: SegmentStorage, private readonly eventTarget: EventTarget, ) { const activeStream = this.lastRequestedSegment.stream; @@ -59,17 +59,10 @@ export class HybridLoader { this.eventTarget, ); - if (!this.segmentStorage.isInitialized) { + if (!this.segmentStorage) { throw new Error("Segment storage is not initialized."); } - this.segmentStorage.addIsSegmentLockedPredicate((segment) => { - if (segment.stream !== activeStream) return false; - return StreamUtils.isSegmentActualInPlayback( - segment, - this.playback, - this.config, - ); - }); + this.p2pLoaders = new P2PLoadersContainer( this.streamManifestUrl, this.lastRequestedSegment.stream, @@ -109,18 +102,46 @@ export class HybridLoader { } this.lastRequestedSegment = segment; + const swarmId = this.config.swarmId ?? this.streamManifestUrl; + const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, stream); + + this.segmentStorage.onSegmentRequested( + swarmId, + streamSwarmId, + segment.externalId, + segment.startTime, + segment.endTime, + stream.type, + this.streamDetails.isLive, + ); const engineRequest = new EngineRequest(segment, callbacks); - if (this.segmentStorage.hasSegment(segment)) { - // TODO: error handling - const data = await this.segmentStorage.getSegmentData(segment); - if (data) { - const { queueDownloadRatio } = this.generateQueue(); - engineRequest.resolve(data, this.getBandwidth(queueDownloadRatio)); + + try { + const hasSegment = this.segmentStorage.hasSegment( + swarmId, + streamSwarmId, + segment.externalId, + ); + + if (hasSegment) { + const data = await this.segmentStorage.getSegmentData( + swarmId, + streamSwarmId, + segment.externalId, + ); + if (data) { + const { queueDownloadRatio } = this.generateQueue(); + engineRequest.resolve(data, this.getBandwidth(queueDownloadRatio)); + return; + } } - } else { + this.engineRequest = engineRequest; + } catch { + engineRequest.reject(); + } finally { + this.requestProcessQueueMicrotask(); } - this.requestProcessQueueMicrotask(); } private requestProcessQueueMicrotask = (force = true) => { @@ -185,9 +206,18 @@ export class HybridLoader { this.engineRequest = undefined; } this.requests.remove(request); + + const swarmId = this.config.swarmId ?? this.streamManifestUrl; + const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, stream); + void this.segmentStorage.storeSegment( - request.segment, + swarmId, + streamSwarmId, + segment.externalId, request.data, + segment.startTime, + segment.endTime, + segment.stream.type, this.streamDetails.isLive, ); break; @@ -350,6 +380,10 @@ export class HybridLoader { } private loadRandomThroughHttp() { + const availableStorageCapacityPercent = + this.getAvailableStorageCapacityPercent(); + if (availableStorageCapacityPercent <= 10) return; + const { simultaneousHttpDownloads, httpErrorRetries } = this.config; const p2pLoader = this.p2pLoaders.currentLoader; @@ -366,11 +400,22 @@ export class HybridLoader { this.playback, this.config, this.p2pLoaders.currentLoader, + availableStorageCapacityPercent, )) { + const swarmId = this.config.swarmId ?? this.streamManifestUrl; + const streamSwarmId = StreamUtils.getStreamSwarmId( + swarmId, + segment.stream, + ); + if ( !statuses.isHttpDownloadable || statuses.isP2PDownloadable || - this.segmentStorage.hasSegment(segment) + this.segmentStorage.hasSegment( + swarmId, + streamSwarmId, + segment.externalId, + ) ) { continue; } @@ -450,21 +495,41 @@ export class HybridLoader { return false; } + private getAvailableStorageCapacityPercent(): number { + const { totalCapacity, usedCapacity } = this.segmentStorage.getUsage(); + return 100 - (usedCapacity / totalCapacity) * 100; + } + private generateQueue() { const queue: QueueItem[] = []; const queueSegmentIds = new Set(); let maxPossibleLength = 0; let alreadyLoadedCount = 0; + + const availableStorageCapacityPercent = + this.getAvailableStorageCapacityPercent(); for (const item of QueueUtils.generateQueue( this.lastRequestedSegment, this.playback, this.config, this.p2pLoaders.currentLoader, + availableStorageCapacityPercent, )) { maxPossibleLength++; const { segment } = item; + + const swarmId = this.config.swarmId ?? this.streamManifestUrl; + const streamSwarmId = StreamUtils.getStreamSwarmId( + swarmId, + segment.stream, + ); + if ( - this.segmentStorage.hasSegment(segment) || + this.segmentStorage.hasSegment( + swarmId, + streamSwarmId, + segment.externalId, + ) || this.requests.get(segment)?.status === "succeed" ) { alreadyLoadedCount++; @@ -534,6 +599,7 @@ export class HybridLoader { this.logger("position significantly changed"); this.engineRequest?.markAsShouldBeStartedImmediately(); } + this.segmentStorage.onPlaybackUpdated(position, rate); void this.requestProcessQueueMicrotask(isPositionSignificantlyChanged); } diff --git a/packages/p2p-media-loader-core/src/index.ts b/packages/p2p-media-loader-core/src/index.ts index 6af19463..c13f7494 100644 --- a/packages/p2p-media-loader-core/src/index.ts +++ b/packages/p2p-media-loader-core/src/index.ts @@ -1,3 +1,4 @@ export { Core } from "./core.js"; export * from "./types.js"; +export type { SegmentStorage } from "./segment-storage/index.js"; export { debug } from "debug"; diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts index 0d3f4ae6..d39d77a7 100644 --- a/packages/p2p-media-loader-core/src/p2p/loader.ts +++ b/packages/p2p-media-loader-core/src/p2p/loader.ts @@ -5,12 +5,16 @@ import { StreamConfig, StreamWithSegments, } from "../types.js"; -import { SegmentsMemoryStorage } from "../segments-storage.js"; import { RequestsContainer } from "../requests/request-container.js"; import { P2PTrackerClient } from "./tracker-client.js"; import * as StreamUtils from "../utils/stream.js"; import * as Utils from "../utils/utils.js"; import { EventTarget } from "../utils/event-target.js"; +import { SegmentStorage } from "../segment-storage/index.js"; + +export type EventTargetMap = { + [key in `onStorageUpdated-${string}`]: () => void; +} & CoreEventMap; export class P2PLoader { private readonly trackerClient: P2PTrackerClient; @@ -20,9 +24,9 @@ export class P2PLoader { private streamManifestUrl: string, private readonly stream: StreamWithSegments, private readonly requests: RequestsContainer, - private readonly segmentStorage: SegmentsMemoryStorage, + private readonly segmentStorage: SegmentStorage, private readonly config: StreamConfig, - private readonly eventTarget: EventTarget, + private readonly eventTarget: EventTarget, private readonly onSegmentAnnouncement: () => void, ) { const swarmId = this.config.swarmId ?? this.streamManifestUrl; @@ -41,10 +45,13 @@ export class P2PLoader { this.eventTarget, ); - this.segmentStorage.subscribeOnUpdate( - this.stream, + this.eventTarget.addEventListener( + `onStorageUpdated-${streamSwarmId}`, this.broadcastAnnouncement, ); + this.segmentStorage.setSegmentChangeCallback((streamId: string) => { + this.eventTarget.dispatchEvent(`onStorageUpdated-${streamId}`); + }); this.trackerClient.start(); } @@ -89,8 +96,13 @@ export class P2PLoader { } private getSegmentsAnnouncement() { - const loaded: number[] = - this.segmentStorage.getStoredSegmentExternalIdsOfStream(this.stream); + const swarmId = this.config.swarmId ?? this.streamManifestUrl; + const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, this.stream); + + const loaded: number[] = this.segmentStorage.getStoredSegmentIds( + swarmId, + streamSwarmId, + ); const httpLoading: number[] = []; for (const request of this.requests.httpRequests()) { @@ -131,7 +143,15 @@ export class P2PLoader { segmentExternalId, ); if (!segment) return; - const segmentData = await this.segmentStorage.getSegmentData(segment); + + const swarmId = this.config.swarmId ?? this.streamManifestUrl; + const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, this.stream); + + const segmentData = await this.segmentStorage.getSegmentData( + swarmId, + streamSwarmId, + segment.externalId, + ); if (!segmentData) { peer.sendSegmentAbsentCommand(segmentExternalId, requestId); return; @@ -144,8 +164,11 @@ export class P2PLoader { }; destroy() { - this.segmentStorage.unsubscribeFromUpdate( - this.stream, + const swarmId = this.config.swarmId ?? this.streamManifestUrl; + const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, this.stream); + + this.eventTarget.removeEventListener( + `onStorageUpdated-${streamSwarmId}`, this.broadcastAnnouncement, ); this.trackerClient.destroy(); diff --git a/packages/p2p-media-loader-core/src/p2p/loaders-container.ts b/packages/p2p-media-loader-core/src/p2p/loaders-container.ts index ddf1a18d..285f17d1 100644 --- a/packages/p2p-media-loader-core/src/p2p/loaders-container.ts +++ b/packages/p2p-media-loader-core/src/p2p/loaders-container.ts @@ -5,11 +5,12 @@ import { Stream, StreamConfig, StreamWithSegments, + SegmentStorage, } from "../index.js"; import { RequestsContainer } from "../requests/request-container.js"; -import { SegmentsMemoryStorage } from "../segments-storage.js"; import * as LoggerUtils from "../utils/logger.js"; import { EventTarget } from "../utils/event-target.js"; +import * as StreamUtils from "../utils/stream.js"; type P2PLoaderContainerItem = { stream: Stream; @@ -27,7 +28,7 @@ export class P2PLoadersContainer { private readonly streamManifestUrl: string, stream: StreamWithSegments, private readonly requests: RequestsContainer, - private readonly segmentStorage: SegmentsMemoryStorage, + private readonly segmentStorage: SegmentStorage, private readonly config: StreamConfig, private readonly eventTarget: EventTarget, private onSegmentAnnouncement: () => void, @@ -64,9 +65,15 @@ export class P2PLoadersContainer { changeCurrentLoader(stream: StreamWithSegments) { const loaderItem = this.loaders.get(stream.runtimeId); if (this._currentLoaderItem) { - const ids = this.segmentStorage.getStoredSegmentExternalIdsOfStream( + const swarmId = this.config.swarmId ?? this.streamManifestUrl; + const streamSwarmId = StreamUtils.getStreamSwarmId( + swarmId, this._currentLoaderItem.stream, ); + const ids = this.segmentStorage.getStoredSegmentIds( + swarmId, + streamSwarmId, + ); if (!ids.length) this.destroyAndRemoveLoader(this._currentLoaderItem); else this.setLoaderDestroyTimeout(this._currentLoaderItem); } diff --git a/packages/p2p-media-loader-core/src/segment-storage/index.ts b/packages/p2p-media-loader-core/src/segment-storage/index.ts new file mode 100644 index 00000000..5b6990b2 --- /dev/null +++ b/packages/p2p-media-loader-core/src/segment-storage/index.ts @@ -0,0 +1,110 @@ +import { CommonCoreConfig, StreamConfig, StreamType } from "../types.js"; +/** Segments storage interface */ +export interface SegmentStorage { + /** + * Initializes storage + * @param coreConfig - Core configuration with storage options + * @param mainStreamConfig - Main stream configuration + * @param secondaryStreamConfig - Secondary stream configuration + */ + initialize( + coreConfig: CommonCoreConfig, + mainStreamConfig: StreamConfig, + secondaryStreamConfig: StreamConfig, + ): Promise; + + /** + * Provides playback position from player + * @param position - Playback position + * @param rate - Playback rate + */ + onPlaybackUpdated(position: number, rate: number): void; + + /** + * Provides segment request information from player + * @param swarmId - Swarm identifier + * @param streamId - Stream identifier + * @param segmentId - Segment identifier + * @param startTime - Segment start time + * @param endTime - Segment end time + * @param streamType - Stream type + * @param isLiveStream - Is live stream + */ + onSegmentRequested( + swarmId: string, + streamId: string, + segmentId: number, + startTime: number, + endTime: number, + streamType: StreamType, + isLiveStream: boolean, + ): void; + + /** + * Stores segment data + * @param swarmId - Swarm identifier + * @param streamId - Stream identifier + * @param segmentId - Segment identifier + * @param data - Segment data + * @param startTime - Segment start time + * @param endTime - Segment end time + * @param streamType - Stream type + * @param isLiveStream - Is live stream + */ + storeSegment( + swarmId: string, + streamId: string, + segmentId: number, + data: ArrayBuffer, + startTime: number, + endTime: number, + streamType: StreamType, + isLiveStream: boolean, + ): Promise; + + /** + * Returns segment data + * @param swarmId - Swarm identifier + * @param streamId - Stream identifier + * @param segmentId - Segment identifier + */ + getSegmentData( + swarmId: string, + streamId: string, + segmentId: number, + ): Promise; + + /** + * Returns used memory information in the storage + */ + getUsage(): { + totalCapacity: number; + usedCapacity: number; + }; + + /** + * Returns true if segment is in storage + * @param swarmId - Swarm identifier + * @param streamId - Stream identifier + * @param segmentId - Segment identifier + */ + hasSegment(swarmId: string, streamId: string, segmentId: number): boolean; + + /** + * Returns segment IDs of a stream that are stored in the storage + * @param swarmId - Swarm identifier + * @param streamId - Stream identifier + */ + getStoredSegmentIds(swarmId: string, streamId: string): number[]; + + /** + * Sets segment change callback function + * @param callback - Callback function that has to be called when segments appear or disappear in the storage + */ + setSegmentChangeCallback(callback: (streamId: string) => void): void; + + /** + * Function to destroy storage + */ + destroy(): void; +} diff --git a/packages/p2p-media-loader-core/src/segment-storage/segment-memory-storage.ts b/packages/p2p-media-loader-core/src/segment-storage/segment-memory-storage.ts new file mode 100644 index 00000000..fafcb279 --- /dev/null +++ b/packages/p2p-media-loader-core/src/segment-storage/segment-memory-storage.ts @@ -0,0 +1,289 @@ +import { CommonCoreConfig, StreamConfig, StreamType } from "../types.js"; +import debug from "debug"; +import { SegmentStorage } from "./index.js"; +import { + isAndroid, + isIPadOrIPhone, + isAndroidWebview, + getStorageItemId, +} from "./utils.js"; + +type SegmentDataItem = { + segmentId: number; + streamId: string; + data: ArrayBuffer; + startTime: number; + endTime: number; + streamType: StreamType; +}; + +type Playback = { + position: number; + rate: number; +}; + +type LastRequestedSegmentInfo = { + streamId: string; + segmentId: number; + startTime: number; + endTime: number; + swarmId: string; + streamType: StreamType; + isLiveStream: boolean; +}; + +const BYTES_PER_MiB = 1048576; + +export class SegmentMemoryStorage implements SegmentStorage { + private readonly userAgent = navigator.userAgent; + private segmentMemoryStorageLimit = 4000; + private currentMemoryStorageSize = 0; + + private cache = new Map(); + private readonly logger: debug.Debugger; + private coreConfig?: CommonCoreConfig; + private mainStreamConfig?: StreamConfig; + private secondaryStreamConfig?: StreamConfig; + private currentPlayback?: Playback; + private lastRequestedSegment?: LastRequestedSegmentInfo; + private segmentChangeCallback?: (streamId: string) => void; + + constructor() { + this.logger = debug("p2pml-core:segment-memory-storage"); + this.logger.color = "RebeccaPurple"; + } + + // eslint-disable-next-line @typescript-eslint/require-await + async initialize( + coreConfig: CommonCoreConfig, + mainStreamConfig: StreamConfig, + secondaryStreamConfig: StreamConfig, + ) { + this.coreConfig = coreConfig; + this.mainStreamConfig = mainStreamConfig; + this.secondaryStreamConfig = secondaryStreamConfig; + + this.setMemoryStorageLimit(); + this.logger("initialized"); + } + + onPlaybackUpdated(position: number, rate: number) { + this.currentPlayback = { position, rate }; + } + + onSegmentRequested( + swarmId: string, + streamId: string, + segmentId: number, + startTime: number, + endTime: number, + streamType: StreamType, + isLiveStream: boolean, + ): void { + this.lastRequestedSegment = { + streamId, + segmentId, + startTime, + endTime, + swarmId, + streamType, + isLiveStream, + }; + } + + // eslint-disable-next-line @typescript-eslint/require-await + async storeSegment( + _swarmId: string, + streamId: string, + segmentId: number, + data: ArrayBuffer, + startTime: number, + endTime: number, + streamType: StreamType, + isLiveStream: boolean, + ) { + this.clear(isLiveStream, data.byteLength); + + const storageId = getStorageItemId(streamId, segmentId); + this.cache.set(storageId, { + data, + segmentId, + streamId, + startTime, + endTime, + streamType, + }); + + this.logger(`add segment: ${segmentId} to ${streamId}`); + + if (!this.segmentChangeCallback) { + throw new Error("dispatchStorageUpdatedEvent is not set"); + } + + this.segmentChangeCallback(streamId); + } + + // eslint-disable-next-line @typescript-eslint/require-await + async getSegmentData(_swarmId: string, streamId: string, segmentId: number) { + const segmentStorageId = getStorageItemId(streamId, segmentId); + const dataItem = this.cache.get(segmentStorageId); + + if (dataItem === undefined) return undefined; + + return dataItem.data; + } + + getUsage() { + if (!this.lastRequestedSegment || !this.currentPlayback) { + return { + totalCapacity: this.segmentMemoryStorageLimit, + usedCapacity: this.currentMemoryStorageSize, + }; + } + const playbackPosition = this.currentPlayback.position; + + let calculatedUsedCapacity = 0; + for (const { endTime, data } of this.cache.values()) { + if (playbackPosition > endTime) continue; + + calculatedUsedCapacity += data.byteLength; + } + + return { + totalCapacity: this.segmentMemoryStorageLimit, + usedCapacity: calculatedUsedCapacity / BYTES_PER_MiB, + }; + } + + hasSegment(_swarmId: string, streamId: string, externalId: number) { + const segmentStorageId = getStorageItemId(streamId, externalId); + const segment = this.cache.get(segmentStorageId); + + return segment !== undefined; + } + + getStoredSegmentIds(_swarmId: string, streamId: string) { + const externalIds: number[] = []; + + for (const { segmentId, streamId: streamCacheId } of this.cache.values()) { + if (streamCacheId !== streamId) continue; + externalIds.push(segmentId); + } + + return externalIds; + } + + private clear(isLiveStream: boolean, newSegmentSize: number) { + if ( + !this.currentPlayback || + !this.mainStreamConfig || + !this.secondaryStreamConfig || + !this.coreConfig + ) { + return; + } + + const isMemoryLimitReached = this.isMemoryLimitReached(newSegmentSize); + + if (!isMemoryLimitReached && !isLiveStream) return; + + const affectedStreams = new Set(); + const sortedCache = Array.from(this.cache.values()).sort( + (a, b) => a.startTime - b.startTime, + ); + + for (const segmentData of sortedCache) { + const { streamId, segmentId } = segmentData; + const storageId = getStorageItemId(streamId, segmentId); + + const shouldRemove = this.shouldRemoveSegment( + segmentData, + isLiveStream, + this.currentPlayback.position, + ); + + if (!shouldRemove) continue; + + this.cache.delete(storageId); + affectedStreams.add(streamId); + this.logger(`Removed segment ${segmentId} from stream ${streamId}`); + + if (!this.isMemoryLimitReached(newSegmentSize) && !isLiveStream) break; + } + + this.sendUpdatesToAffectedStreams(affectedStreams); + } + + private isMemoryLimitReached(segmentByteLength: number) { + return ( + this.currentMemoryStorageSize + segmentByteLength / BYTES_PER_MiB > + this.segmentMemoryStorageLimit + ); + } + + setSegmentChangeCallback(callback: (streamId: string) => void) { + this.segmentChangeCallback = callback; + } + + private sendUpdatesToAffectedStreams(affectedStreams: Set) { + if (affectedStreams.size === 0) return; + + affectedStreams.forEach((stream) => { + if (!this.segmentChangeCallback) { + throw new Error("dispatchStorageUpdatedEvent is not set"); + } + + this.segmentChangeCallback(stream); + }); + } + + private shouldRemoveSegment( + segmentData: SegmentDataItem, + isLiveStream: boolean, + currentPlaybackPosition: number, + ): boolean { + const { endTime, streamType } = segmentData; + const highDemandTimeWindow = this.getStreamTimeWindow( + streamType, + "highDemandTimeWindow", + ); + + if (currentPlaybackPosition <= endTime) return false; + + if (isLiveStream) { + return currentPlaybackPosition > highDemandTimeWindow + endTime; + } + + return true; + } + + private setMemoryStorageLimit() { + if (this.coreConfig && this.coreConfig.segmentMemoryStorageLimit) { + this.segmentMemoryStorageLimit = + this.coreConfig.segmentMemoryStorageLimit; + return; + } + + if (isAndroidWebview(this.userAgent) || isIPadOrIPhone(this.userAgent)) { + this.segmentMemoryStorageLimit = 1000; + } else if (isAndroid(this.userAgent)) { + this.segmentMemoryStorageLimit = 2000; + } + } + + private getStreamTimeWindow( + streamType: string, + configKey: "highDemandTimeWindow" | "httpDownloadTimeWindow", + ): number { + const config = + streamType === "main" + ? this.mainStreamConfig + : this.secondaryStreamConfig; + + return config?.[configKey] ?? 0; + } + + public destroy() { + this.cache.clear(); + } +} diff --git a/packages/p2p-media-loader-core/src/segment-storage/utils.ts b/packages/p2p-media-loader-core/src/segment-storage/utils.ts new file mode 100644 index 00000000..d040ace8 --- /dev/null +++ b/packages/p2p-media-loader-core/src/segment-storage/utils.ts @@ -0,0 +1,10 @@ +export const getStorageItemId = (streamId: string, segmentId: number) => + `${streamId}|${segmentId}`; + +export const isAndroid = (userAgent: string) => /Android/i.test(userAgent); + +export const isIPadOrIPhone = (userAgent: string) => + /iPad|iPhone/i.test(userAgent); + +export const isAndroidWebview = (userAgent: string) => + /Android/i.test(userAgent) && !/Chrome|Firefox/i.test(userAgent); diff --git a/packages/p2p-media-loader-core/src/segments-storage.ts b/packages/p2p-media-loader-core/src/segments-storage.ts deleted file mode 100644 index 0fbe433e..00000000 --- a/packages/p2p-media-loader-core/src/segments-storage.ts +++ /dev/null @@ -1,192 +0,0 @@ -import { CommonCoreConfig, SegmentWithStream, Stream } from "./types.js"; -import * as StreamUtils from "./utils/stream.js"; -import debug from "debug"; -import { EventTarget } from "./utils/event-target.js"; - -type StorageConfig = CommonCoreConfig; - -function getStorageItemId(segment: SegmentWithStream) { - const streamId = StreamUtils.getStreamId(segment.stream); - return `${streamId}|${segment.externalId}`; -} - -type StorageItem = { - segment: SegmentWithStream; - data: ArrayBuffer; - lastAccessed: number; -}; - -type StorageEventHandlers = { - [key in `onStorageUpdated-${string}`]: (steam: Stream) => void; -}; - -const DEFAULT_LIVE_CACHED_SEGMENT_EXPIRATION = 1200; - -export class SegmentsMemoryStorage { - private cache = new Map(); - private _isInitialized = false; - private readonly isSegmentLockedPredicates: (( - segment: SegmentWithStream, - ) => boolean)[] = []; - private readonly logger: debug.Debugger; - private readonly eventTarget = new EventTarget(); - - constructor(private readonly storageConfig: StorageConfig) { - this.logger = debug("p2pml-core:segment-memory-storage"); - this.logger.color = "RebeccaPurple"; - } - - // eslint-disable-next-line @typescript-eslint/require-await - async initialize() { - this._isInitialized = true; - this.logger("initialized"); - } - - get isInitialized(): boolean { - return this._isInitialized; - } - - addIsSegmentLockedPredicate( - predicate: (segment: SegmentWithStream) => boolean, - ) { - this.isSegmentLockedPredicates.push(predicate); - } - - private isSegmentLocked(segment: SegmentWithStream): boolean { - return this.isSegmentLockedPredicates.some((p) => p(segment)); - } - - // eslint-disable-next-line @typescript-eslint/require-await - async storeSegment( - segment: SegmentWithStream, - data: ArrayBuffer, - isLiveStream: boolean, - ) { - const id = getStorageItemId(segment); - this.cache.set(id, { - segment, - data, - lastAccessed: performance.now(), - }); - this.logger(`add segment: ${id}`); - this.dispatchStorageUpdatedEvent(segment.stream); - void this.clear(isLiveStream); - } - - // eslint-disable-next-line @typescript-eslint/require-await - async getSegmentData( - segment: SegmentWithStream, - ): Promise { - const itemId = getStorageItemId(segment); - const cacheItem = this.cache.get(itemId); - if (cacheItem === undefined) return undefined; - - cacheItem.lastAccessed = performance.now(); - return cacheItem.data; - } - - hasSegment(segment: SegmentWithStream): boolean { - const id = getStorageItemId(segment); - return this.cache.has(id); - } - - getStoredSegmentExternalIdsOfStream(stream: Stream) { - const streamId = StreamUtils.getStreamId(stream); - const externalIds: number[] = []; - for (const { segment } of this.cache.values()) { - const itemStreamId = StreamUtils.getStreamId(segment.stream); - if (itemStreamId === streamId) externalIds.push(segment.externalId); - } - return externalIds; - } - - // eslint-disable-next-line @typescript-eslint/require-await - private async clear(isLiveStream: boolean): Promise { - const cacheSegmentExpiration = - (this.storageConfig.cachedSegmentExpiration ?? - (isLiveStream ? DEFAULT_LIVE_CACHED_SEGMENT_EXPIRATION : 0)) * 1000; - - if (cacheSegmentExpiration === 0) return false; - - const itemsToDelete: string[] = []; - const remainingItems: [string, StorageItem][] = []; - const streamsOfChangedItems = new Set(); - - // Delete old segments - const now = performance.now(); - - for (const entry of this.cache.entries()) { - const [itemId, item] = entry; - const { lastAccessed, segment } = item; - - if (now - lastAccessed > cacheSegmentExpiration) { - if (!this.isSegmentLocked(segment)) { - itemsToDelete.push(itemId); - streamsOfChangedItems.add(segment.stream); - } - } else { - remainingItems.push(entry); - } - } - - // Delete segments over cached count - if (this.storageConfig.cachedSegmentsCount > 0) { - let countOverhead = - remainingItems.length - this.storageConfig.cachedSegmentsCount; - if (countOverhead > 0) { - remainingItems.sort(([, a], [, b]) => a.lastAccessed - b.lastAccessed); - - for (const [itemId, { segment }] of remainingItems) { - if (!this.isSegmentLocked(segment)) { - itemsToDelete.push(itemId); - streamsOfChangedItems.add(segment.stream); - countOverhead--; - if (countOverhead === 0) break; - } - } - } - } - - if (itemsToDelete.length) { - this.logger(`cleared ${itemsToDelete.length} segments`); - itemsToDelete.forEach((id) => this.cache.delete(id)); - for (const stream of streamsOfChangedItems) { - this.dispatchStorageUpdatedEvent(stream); - } - } - - return itemsToDelete.length > 0; - } - - subscribeOnUpdate( - stream: Stream, - listener: StorageEventHandlers["onStorageUpdated-"], - ) { - const streamId = StreamUtils.getStreamId(stream); - this.eventTarget.addEventListener(`onStorageUpdated-${streamId}`, listener); - } - - unsubscribeFromUpdate( - stream: Stream, - listener: StorageEventHandlers["onStorageUpdated-"], - ) { - const streamId = StreamUtils.getStreamId(stream); - this.eventTarget.removeEventListener( - `onStorageUpdated-${streamId}`, - listener, - ); - } - - private dispatchStorageUpdatedEvent(stream: Stream) { - this.eventTarget.dispatchEvent( - `onStorageUpdated-${StreamUtils.getStreamId(stream)}`, - stream, - ); - } - - // eslint-disable-next-line @typescript-eslint/require-await - public async destroy() { - this.cache.clear(); - this._isInitialized = false; - } -} diff --git a/packages/p2p-media-loader-core/src/types.ts b/packages/p2p-media-loader-core/src/types.ts index 919d9fce..ef2dfb1f 100644 --- a/packages/p2p-media-loader-core/src/types.ts +++ b/packages/p2p-media-loader-core/src/types.ts @@ -1,3 +1,5 @@ +import { SegmentStorage } from "./segment-storage/index.js"; + /** Represents the types of streams available, either primary (main) or secondary. */ export type StreamType = "main" | "secondary"; @@ -113,26 +115,32 @@ export type DynamicCoreConfig = Partial< /** Represents the configuration for the Core functionality that is common to all streams. */ export type CommonCoreConfig = { /** - * Time after which a cached segment expires, in seconds. - * If set to undefined, the cacheSegmentExpiration is disabled for VOD streams, and a default value (20 minutes) is used for live streams. + * Defines the memory storage limit for media segments, in MiB. * * @default * ```typescript - * cachedSegmentExpiration: undefined + * segmentMemoryStorageLimit: undefined * ``` + * + * - When `undefined`, the default limit is determined based on the device type and browser: + * - Desktop: 4000 MiB + * - Android: 2000 MiB + * - iOS: 1000 MiB + * - Android WebView: 1000 MiB + * - iOS WebView: 1000 MiB + * */ - cachedSegmentExpiration?: number; + segmentMemoryStorageLimit: number | undefined; + /** - * Maximum number of segments to store in the cache. - * Has to be less then httpDownloadTimeWindow and p2pDownloadTimeWindow. - * If set to 0, the cache is unlimited. + * Optional custom storage factory for the segments storage. * * @default * ```typescript - * cachedSegmentsCount: 0 + * customSegmentStorageFactory: undefined * ``` */ - cachedSegmentsCount: number; + customSegmentStorageFactory?: (isLive: boolean) => SegmentStorage; }; /** diff --git a/packages/p2p-media-loader-core/src/utils/queue.ts b/packages/p2p-media-loader-core/src/utils/queue.ts index 028c7e89..eb8ffb63 100644 --- a/packages/p2p-media-loader-core/src/utils/queue.ts +++ b/packages/p2p-media-loader-core/src/utils/queue.ts @@ -17,6 +17,7 @@ export function* generateQueue( playback: Readonly, playbackConfig: PlaybackTimeWindowsConfig, currentP2PLoader: P2PLoader, + availablePercentMemory: number, ): Generator { const { runtimeId, stream } = lastRequestedSegment; @@ -38,6 +39,7 @@ export function* generateQueue( playback, playbackConfig, currentP2PLoader, + availablePercentMemory, ); if (isNotActualStatuses(firstStatuses)) { const next = queueSegments.next(); @@ -54,6 +56,7 @@ export function* generateQueue( playback, playbackConfig, currentP2PLoader, + availablePercentMemory, ); if (isNotActualStatuses(secondStatuses)) return; @@ -70,6 +73,7 @@ export function* generateQueue( playback, playbackConfig, currentP2PLoader, + availablePercentMemory, ); if (isNotActualStatuses(statuses)) break; yield { segment, statuses }; diff --git a/packages/p2p-media-loader-core/src/utils/stream.ts b/packages/p2p-media-loader-core/src/utils/stream.ts index 02396e82..232a1d11 100644 --- a/packages/p2p-media-loader-core/src/utils/stream.ts +++ b/packages/p2p-media-loader-core/src/utils/stream.ts @@ -62,30 +62,44 @@ export function getSegmentAvgDuration(stream: StreamWithSegments) { return sumDuration / size; } -export function isSegmentActualInPlayback( - segment: Readonly, - playback: Playback, +function calculateTimeWindows( timeWindowsConfig: PlaybackTimeWindowsConfig, -): boolean { + availableMemoryInPercent: number, +) { const { - isHighDemand = false, - isHttpDownloadable = false, - isP2PDownloadable = false, - } = getSegmentPlaybackStatuses(segment, playback, timeWindowsConfig); - return isHighDemand || isHttpDownloadable || isP2PDownloadable; + highDemandTimeWindow, + httpDownloadTimeWindow, + p2pDownloadTimeWindow, + } = timeWindowsConfig; + + const result = { + highDemandTimeWindow, + httpDownloadTimeWindow, + p2pDownloadTimeWindow, + }; + + if (availableMemoryInPercent <= 5) { + result.httpDownloadTimeWindow = 0; + result.p2pDownloadTimeWindow = 0; + } else if (availableMemoryInPercent <= 10) { + result.p2pDownloadTimeWindow = result.httpDownloadTimeWindow; + } + + return result; } export function getSegmentPlaybackStatuses( segment: SegmentWithStream, playback: Playback, timeWindowsConfig: PlaybackTimeWindowsConfig, - currentP2PLoader?: P2PLoader, + currentP2PLoader: P2PLoader, + availableMemoryPercent: number, ): SegmentPlaybackStatuses { const { highDemandTimeWindow, httpDownloadTimeWindow, p2pDownloadTimeWindow, - } = timeWindowsConfig; + } = calculateTimeWindows(timeWindowsConfig, availableMemoryPercent); return { isHighDemand: isSegmentInTimeWindow( @@ -100,8 +114,7 @@ export function getSegmentPlaybackStatuses( ), isP2PDownloadable: isSegmentInTimeWindow(segment, playback, p2pDownloadTimeWindow) && - (!currentP2PLoader || - currentP2PLoader.isSegmentLoadingOrLoadedBySomeone(segment)), + currentP2PLoader.isSegmentLoadingOrLoadedBySomeone(segment), }; }