Skip to content

Commit

Permalink
refactor: Improve segment storage event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
DimaDemchenko committed Sep 9, 2024
1 parent 213c32a commit 337d264
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 53 deletions.
19 changes: 10 additions & 9 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ export class HybridLoader {
const engineRequest = new EngineRequest(segment, callbacks);

try {
if (
this.segmentStorage.hasSegment(
streamSwarmId,
segment.externalId,
swarmId,
)
) {
const hasSegment = this.segmentStorage.hasSegment(
streamSwarmId,
segment.externalId,
swarmId,
);

if (hasSegment) {
const data = await this.segmentStorage.getSegmentData(
streamSwarmId,
segment.externalId,
Expand All @@ -132,10 +132,11 @@ export class HybridLoader {
if (data) {
const { queueDownloadRatio } = this.generateQueue();
engineRequest.resolve(data, this.getBandwidth(queueDownloadRatio));
return;
}

this.engineRequest = engineRequest;
}

this.engineRequest = engineRequest;
} catch {
engineRequest.reject();
} finally {
Expand Down
20 changes: 15 additions & 5 deletions packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import * as Utils from "../utils/utils.js";
import { EventTarget } from "../utils/event-target.js";
import { SegmentStorage } from "../segment-storage/index.js";

export type EventTargetMap = {
[key in `onStorageUpdated-${string}`]: () => void;
} & CoreEventMap;

export class P2PLoader {
private readonly trackerClient: P2PTrackerClient;
private isAnnounceMicrotaskCreated = false;
Expand All @@ -22,7 +26,7 @@ export class P2PLoader {
private readonly requests: RequestsContainer,
private readonly segmentStorage: SegmentStorage,
private readonly config: StreamConfig,
private readonly eventTarget: EventTarget<CoreEventMap>,
private readonly eventTarget: EventTarget<EventTargetMap>,
private readonly onSegmentAnnouncement: () => void,
) {
const swarmId = this.config.swarmId ?? this.streamManifestUrl;
Expand All @@ -41,10 +45,13 @@ export class P2PLoader {
this.eventTarget,
);

this.segmentStorage.subscribeOnUpdate(
streamSwarmId,
this.eventTarget.addEventListener(
`onStorageUpdated-${streamSwarmId}`,
this.broadcastAnnouncement,
);
this.segmentStorage.setUpdateEventDispatcher((streamId: string) => {
this.eventTarget.dispatchEvent(`onStorageUpdated-${streamId}`);
});

this.trackerClient.start();
}
Expand Down Expand Up @@ -157,8 +164,11 @@ export class P2PLoader {
};

destroy() {
this.segmentStorage.unsubscribeFromUpdate(
StreamUtils.getStreamId(this.stream),
const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, this.stream);

this.eventTarget.removeEventListener(
`onStorageUpdated-${streamSwarmId}`,
this.broadcastAnnouncement,
);
this.trackerClient.destroy();
Expand Down
14 changes: 3 additions & 11 deletions packages/p2p-media-loader-core/src/segment-storage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,10 @@ export interface SegmentStorage {
getStoredSegmentIds(streamId: string, swarmId: string): number[];

/**
* Function to subscribe on stream updates
* @param streamId - Stream identifier
* @param listener - Listener
*/
subscribeOnUpdate(streamId: string, listener: () => void): void;

/**
* Function to unsubscribe from stream updates
* @param streamId - Stream identifier
* @param listener - Listener
* Sets event dispatcher for storage update
* @param eventDispatcher - Event dispatcher
*/
unsubscribeFromUpdate(streamId: string, listener: () => void): void;
setUpdateEventDispatcher(eventDispatcher: (streamId: string) => void): void;

/**
* Function to destroy storage
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { CommonCoreConfig, StreamConfig, StreamType } from "../types.js";
import debug from "debug";
import { EventTarget } from "../utils/event-target.js";
import { SegmentStorage } from "./index.js";

type SegmentDataItem = {
Expand All @@ -24,23 +23,19 @@ type LastRequestedSegmentInfo = {
endTime: number;
};

type StorageEventHandlers = {
[key in `onStorageUpdated-${string}`]: () => void;
};

function getStorageItemId(streamId: string, segmentId: number) {
return `${streamId}|${segmentId}`;
}

export class SegmentMemoryStorage implements SegmentStorage {
private cache = new Map<string, SegmentDataItem>();
private readonly logger: debug.Debugger;
private readonly eventTarget = new EventTarget<StorageEventHandlers>();
private storageConfig?: CommonCoreConfig;
private mainStreamConfig?: StreamConfig;
private secondaryStreamConfig?: StreamConfig;
private currentPlayback?: Playback;
private lastRequestedSegment?: LastRequestedSegmentInfo;
private dispatchStorageUpdatedEvent?: (streamId: string) => void;

constructor() {
this.logger = debug("p2pml-core:segment-memory-storage");
Expand Down Expand Up @@ -103,6 +98,11 @@ export class SegmentMemoryStorage implements SegmentStorage {
});

this.logger(`add segment: ${segmentId} to ${streamId}`);

if (!this.dispatchStorageUpdatedEvent) {
throw new Error("dispatchStorageUpdatedEvent is not set");
}

this.dispatchStorageUpdatedEvent(streamId);
void this.clear(isLiveStream);
}
Expand Down Expand Up @@ -186,28 +186,19 @@ export class SegmentMemoryStorage implements SegmentStorage {
}
}

affectedStreams.forEach((stream) =>
this.dispatchStorageUpdatedEvent(stream),
);
affectedStreams.forEach((stream) => {
if (!this.dispatchStorageUpdatedEvent) {
throw new Error("dispatchStorageUpdatedEvent is not set");
}

return affectedStreams.size > 0;
}
this.dispatchStorageUpdatedEvent(stream);
});

subscribeOnUpdate(
streamId: string,
listener: StorageEventHandlers["onStorageUpdated-"],
) {
this.eventTarget.addEventListener(`onStorageUpdated-${streamId}`, listener);
return affectedStreams.size > 0;
}

unsubscribeFromUpdate(
streamId: string,
listener: StorageEventHandlers["onStorageUpdated-"],
) {
this.eventTarget.removeEventListener(
`onStorageUpdated-${streamId}`,
listener,
);
setUpdateEventDispatcher(eventDispatcher: (streamId: string) => void) {
this.dispatchStorageUpdatedEvent = eventDispatcher;
}

private getStorageMaxCacheCount() {
Expand Down Expand Up @@ -241,10 +232,6 @@ export class SegmentMemoryStorage implements SegmentStorage {
: segmentsInTimeWindow;
}

private dispatchStorageUpdatedEvent(streamId: string) {
this.eventTarget.dispatchEvent(`onStorageUpdated-${streamId}`);
}

private getStreamTimeWindow(
streamType: string,
configKey: "highDemandTimeWindow" | "httpDownloadTimeWindow",
Expand Down

0 comments on commit 337d264

Please sign in to comment.