diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index eaeecba7..7d862a34 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -117,13 +117,13 @@ export class HybridLoader { const engineRequest = new EngineRequest(segment, callbacks); try { - if ( - this.segmentStorage.hasSegment( - streamSwarmId, - segment.externalId, - swarmId, - ) - ) { + const hasSegment = this.segmentStorage.hasSegment( + streamSwarmId, + segment.externalId, + swarmId, + ); + + if (hasSegment) { const data = await this.segmentStorage.getSegmentData( streamSwarmId, segment.externalId, @@ -132,10 +132,11 @@ export class HybridLoader { if (data) { const { queueDownloadRatio } = this.generateQueue(); engineRequest.resolve(data, this.getBandwidth(queueDownloadRatio)); + return; } - - this.engineRequest = engineRequest; } + + this.engineRequest = engineRequest; } catch { engineRequest.reject(); } finally { diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts index b2bff904..7a459700 100644 --- a/packages/p2p-media-loader-core/src/p2p/loader.ts +++ b/packages/p2p-media-loader-core/src/p2p/loader.ts @@ -12,6 +12,10 @@ import * as Utils from "../utils/utils.js"; import { EventTarget } from "../utils/event-target.js"; import { SegmentStorage } from "../segment-storage/index.js"; +export type EventTargetMap = { + [key in `onStorageUpdated-${string}`]: () => void; +} & CoreEventMap; + export class P2PLoader { private readonly trackerClient: P2PTrackerClient; private isAnnounceMicrotaskCreated = false; @@ -22,7 +26,7 @@ export class P2PLoader { private readonly requests: RequestsContainer, private readonly segmentStorage: SegmentStorage, private readonly config: StreamConfig, - private readonly eventTarget: EventTarget, + private readonly eventTarget: EventTarget, private readonly onSegmentAnnouncement: () => void, ) { const swarmId = this.config.swarmId ?? this.streamManifestUrl; @@ -41,10 +45,13 @@ export class P2PLoader { this.eventTarget, ); - this.segmentStorage.subscribeOnUpdate( - streamSwarmId, + this.eventTarget.addEventListener( + `onStorageUpdated-${streamSwarmId}`, this.broadcastAnnouncement, ); + this.segmentStorage.setUpdateEventDispatcher((streamId: string) => { + this.eventTarget.dispatchEvent(`onStorageUpdated-${streamId}`); + }); this.trackerClient.start(); } @@ -157,8 +164,11 @@ export class P2PLoader { }; destroy() { - this.segmentStorage.unsubscribeFromUpdate( - StreamUtils.getStreamId(this.stream), + const swarmId = this.config.swarmId ?? this.streamManifestUrl; + const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, this.stream); + + this.eventTarget.removeEventListener( + `onStorageUpdated-${streamSwarmId}`, this.broadcastAnnouncement, ); this.trackerClient.destroy(); diff --git a/packages/p2p-media-loader-core/src/segment-storage/index.ts b/packages/p2p-media-loader-core/src/segment-storage/index.ts index 19a2e15c..bf82fe3b 100644 --- a/packages/p2p-media-loader-core/src/segment-storage/index.ts +++ b/packages/p2p-media-loader-core/src/segment-storage/index.ts @@ -90,18 +90,10 @@ export interface SegmentStorage { getStoredSegmentIds(streamId: string, swarmId: string): number[]; /** - * Function to subscribe on stream updates - * @param streamId - Stream identifier - * @param listener - Listener - */ - subscribeOnUpdate(streamId: string, listener: () => void): void; - - /** - * Function to unsubscribe from stream updates - * @param streamId - Stream identifier - * @param listener - Listener + * Sets event dispatcher for storage update + * @param eventDispatcher - Event dispatcher */ - unsubscribeFromUpdate(streamId: string, listener: () => void): void; + setUpdateEventDispatcher(eventDispatcher: (streamId: string) => void): void; /** * Function to destroy storage diff --git a/packages/p2p-media-loader-core/src/segment-storage/segment-memory-storage.ts b/packages/p2p-media-loader-core/src/segment-storage/segment-memory-storage.ts index 0f41c51d..250df6ce 100644 --- a/packages/p2p-media-loader-core/src/segment-storage/segment-memory-storage.ts +++ b/packages/p2p-media-loader-core/src/segment-storage/segment-memory-storage.ts @@ -1,6 +1,5 @@ import { CommonCoreConfig, StreamConfig, StreamType } from "../types.js"; import debug from "debug"; -import { EventTarget } from "../utils/event-target.js"; import { SegmentStorage } from "./index.js"; type SegmentDataItem = { @@ -24,10 +23,6 @@ type LastRequestedSegmentInfo = { endTime: number; }; -type StorageEventHandlers = { - [key in `onStorageUpdated-${string}`]: () => void; -}; - function getStorageItemId(streamId: string, segmentId: number) { return `${streamId}|${segmentId}`; } @@ -35,12 +30,12 @@ function getStorageItemId(streamId: string, segmentId: number) { export class SegmentMemoryStorage implements SegmentStorage { private cache = new Map(); private readonly logger: debug.Debugger; - private readonly eventTarget = new EventTarget(); private storageConfig?: CommonCoreConfig; private mainStreamConfig?: StreamConfig; private secondaryStreamConfig?: StreamConfig; private currentPlayback?: Playback; private lastRequestedSegment?: LastRequestedSegmentInfo; + private dispatchStorageUpdatedEvent?: (streamId: string) => void; constructor() { this.logger = debug("p2pml-core:segment-memory-storage"); @@ -103,6 +98,11 @@ export class SegmentMemoryStorage implements SegmentStorage { }); this.logger(`add segment: ${segmentId} to ${streamId}`); + + if (!this.dispatchStorageUpdatedEvent) { + throw new Error("dispatchStorageUpdatedEvent is not set"); + } + this.dispatchStorageUpdatedEvent(streamId); void this.clear(isLiveStream); } @@ -186,28 +186,19 @@ export class SegmentMemoryStorage implements SegmentStorage { } } - affectedStreams.forEach((stream) => - this.dispatchStorageUpdatedEvent(stream), - ); + affectedStreams.forEach((stream) => { + if (!this.dispatchStorageUpdatedEvent) { + throw new Error("dispatchStorageUpdatedEvent is not set"); + } - return affectedStreams.size > 0; - } + this.dispatchStorageUpdatedEvent(stream); + }); - subscribeOnUpdate( - streamId: string, - listener: StorageEventHandlers["onStorageUpdated-"], - ) { - this.eventTarget.addEventListener(`onStorageUpdated-${streamId}`, listener); + return affectedStreams.size > 0; } - unsubscribeFromUpdate( - streamId: string, - listener: StorageEventHandlers["onStorageUpdated-"], - ) { - this.eventTarget.removeEventListener( - `onStorageUpdated-${streamId}`, - listener, - ); + setUpdateEventDispatcher(eventDispatcher: (streamId: string) => void) { + this.dispatchStorageUpdatedEvent = eventDispatcher; } private getStorageMaxCacheCount() { @@ -241,10 +232,6 @@ export class SegmentMemoryStorage implements SegmentStorage { : segmentsInTimeWindow; } - private dispatchStorageUpdatedEvent(streamId: string) { - this.eventTarget.dispatchEvent(`onStorageUpdated-${streamId}`); - } - private getStreamTimeWindow( streamType: string, configKey: "highDemandTimeWindow" | "httpDownloadTimeWindow",