From 09b49fafde3e00f51d327da54f57d6a46abcc438 Mon Sep 17 00:00:00 2001 From: DimaDemchenko Date: Tue, 24 Sep 2024 10:45:49 +0300 Subject: [PATCH] feat: Implemented custom indexedDbStorage --- .../indexed-db-storage.ts | 241 ++++++++++++++++++ .../indexed-db-wrapper.ts | 95 +++++++ 2 files changed, 336 insertions(+) create mode 100644 packages/p2p-media-loader-demo/src/custom-segment-storage-example/indexed-db-storage.ts create mode 100644 packages/p2p-media-loader-demo/src/custom-segment-storage-example/indexed-db-wrapper.ts diff --git a/packages/p2p-media-loader-demo/src/custom-segment-storage-example/indexed-db-storage.ts b/packages/p2p-media-loader-demo/src/custom-segment-storage-example/indexed-db-storage.ts new file mode 100644 index 00000000..4a69cb42 --- /dev/null +++ b/packages/p2p-media-loader-demo/src/custom-segment-storage-example/indexed-db-storage.ts @@ -0,0 +1,241 @@ +import { + CommonCoreConfig, + SegmentStorage, + StreamConfig, + StreamType, +} from "p2p-media-loader-core"; +import { IndexedDbWrapper } from "./indexed-db-wrapper"; + +type SegmentDataItem = { + storageId: string; + data: ArrayBuffer; +}; + +type Playback = { + position: number; + rate: number; +}; + +type LastRequestedSegmentInfo = { + streamId: string; + segmentId: number; + startTime: number; + endTime: number; + swarmId: string; + streamType: StreamType; + isLiveStream: boolean; +}; + +type SegmentInfoItem = { + storageId: string; + dataLength: number; + streamId: string; + segmentId: number; + streamType: string; + startTime: number; + endTime: number; + swarmId: string; +}; + +function getStorageItemId(streamId: string, segmentId: number) { + return `${streamId}|${segmentId}`; +} + +const INFO_ITEMS_STORE_NAME = "segmentInfo"; +const DATA_ITEMS_STORE_NAME = "segmentData"; +const DB_NAME = "p2p-media-loader"; +const DB_VERSION = 1; +const BYTES_PER_MB = 1048576; + +export class IndexedDbStorage implements SegmentStorage { + private segmentsMemoryStorageLimit = 4000; // 4 GB + private currentMemoryStorageSize = 0; // current memory storage size in MB + + private storageConfig?: CommonCoreConfig; + private mainStreamConfig?: StreamConfig; + private secondaryStreamConfig?: StreamConfig; + private cache = new Map(); + + private currentPlayback?: Playback; // current playback position and rate + private lastRequestedSegment?: LastRequestedSegmentInfo; // details about the last requested segment by the player + private db: IndexedDbWrapper; + + private dispatchStorageUpdatedEvent?: (streamId: string) => void; + + constructor() { + this.db = new IndexedDbWrapper( + DB_NAME, + DB_VERSION, + INFO_ITEMS_STORE_NAME, + DATA_ITEMS_STORE_NAME, + ); + } + + onPlaybackUpdated(position: number, rate: number): void { + this.currentPlayback = { position, rate }; + } + + onSegmentRequested( + streamId: string, + segmentId: number, + startTime: number, + endTime: number, + swarmId: string, + streamType: StreamType, + isLiveStream: boolean, + ): void { + this.lastRequestedSegment = { + streamId, + segmentId, + startTime, + endTime, + swarmId, + streamType, + isLiveStream, + }; + } + + async initialize( + storageConfig: CommonCoreConfig, + mainStreamConfig: StreamConfig, + secondaryStreamConfig: StreamConfig, + ) { + this.storageConfig = storageConfig; + this.mainStreamConfig = mainStreamConfig; + this.secondaryStreamConfig = secondaryStreamConfig; + + try { + // await this.db.deleteDatabase(); + await this.db.openDatabase(); + await this.loadCacheMap(); + } catch (error) { + // eslint-disable-next-line no-console + console.error("Failed to initialize custom segment storage:", error); + throw error; + } + } + + async storeSegment( + streamId: string, + segmentId: number, + data: ArrayBuffer, + startTime: number, + endTime: number, + swarmId: string, + streamType: StreamType, + _isLiveStream: boolean, + ): Promise { + const storageId = getStorageItemId(streamId, segmentId); + const segmentDataItem = { + storageId, + data, + }; + const segmentInfoItem = { + storageId, + dataLength: data.byteLength, + streamId, + segmentId, + streamType, + startTime, + endTime, + swarmId, + }; + + try { + /* + * await this.clear(); + * Implement your own logic to remove old segments and manage the memory storage size + */ + + await Promise.all([ + this.db.put(DATA_ITEMS_STORE_NAME, segmentDataItem), + this.db.put(INFO_ITEMS_STORE_NAME, segmentInfoItem), + ]); + + this.cache.set(storageId, segmentInfoItem); + this.increaseMemoryStorageSize(data.byteLength); + + if (this.dispatchStorageUpdatedEvent) { + this.dispatchStorageUpdatedEvent(streamId); + } + } catch (error) { + // eslint-disable-next-line no-console + console.error(`Failed to store segment ${segmentId}:`, error); + throw error; + // Optionally, implement retry logic or other error recovery mechanisms + } + } + + async getSegmentData( + streamId: string, + segmentId: number, + ): Promise { + const segmentStorageId = getStorageItemId(streamId, segmentId); + try { + const result = await this.db.get( + DATA_ITEMS_STORE_NAME, + segmentStorageId, + ); + + return result?.data; + } catch (error) { + // eslint-disable-next-line no-console + console.error( + `Error retrieving segment data for ${segmentStorageId}:`, + error, + ); + return undefined; + } + } + + getUsage() { + /* + * Implement your own logic to calculate the memory used by the segments stored in memory. + */ + return { + totalCapacity: this.segmentsMemoryStorageLimit, + usedCapacity: this.currentMemoryStorageSize, + }; + } + + hasSegment(streamId: string, segmentId: number): boolean { + const storageId = getStorageItemId(streamId, segmentId); + return this.cache.has(storageId); + } + + getStoredSegmentIds(streamId: string): number[] { + const storedSegments: number[] = []; + + for (const segment of this.cache.values()) { + if (segment.streamId === streamId) { + storedSegments.push(segment.segmentId); + } + } + + return storedSegments; + } + + destroy() { + this.db.closeDatabase(); + this.cache.clear(); + } + + setUpdateEventDispatcher(eventDispatcher: (streamId: string) => void) { + this.dispatchStorageUpdatedEvent = eventDispatcher; + } + + private async loadCacheMap() { + const result = await this.db.getAll(INFO_ITEMS_STORE_NAME); + + result.forEach((item) => { + const storageId = getStorageItemId(item.streamId, item.segmentId); + this.cache.set(storageId, item); + + this.increaseMemoryStorageSize(item.dataLength); + }); + } + + private increaseMemoryStorageSize(dataLength: number) { + this.currentMemoryStorageSize += dataLength / BYTES_PER_MB; + } +} diff --git a/packages/p2p-media-loader-demo/src/custom-segment-storage-example/indexed-db-wrapper.ts b/packages/p2p-media-loader-demo/src/custom-segment-storage-example/indexed-db-wrapper.ts new file mode 100644 index 00000000..c384e3af --- /dev/null +++ b/packages/p2p-media-loader-demo/src/custom-segment-storage-example/indexed-db-wrapper.ts @@ -0,0 +1,95 @@ +export class IndexedDbWrapper { + private db: IDBDatabase | null = null; + + constructor( + private readonly dbName: string, + private readonly dbVersion: number, + private readonly infoItemsStoreName: string, + private readonly dataItemsStoreName: string, + ) {} + + async openDatabase(): Promise { + return new Promise((resolve, reject) => { + const request = indexedDB.open(this.dbName, this.dbVersion); + + request.onerror = () => reject(new Error("Failed to open database.")); + request.onsuccess = () => { + this.db = request.result; + resolve(); + }; + request.onupgradeneeded = (event) => { + this.db = (event.target as IDBOpenDBRequest).result; + this.createObjectStores(this.db); + }; + }); + } + + private createObjectStores(db: IDBDatabase): void { + if (!db.objectStoreNames.contains(this.dataItemsStoreName)) { + db.createObjectStore(this.dataItemsStoreName, { keyPath: "storageId" }); + } + if (!db.objectStoreNames.contains(this.infoItemsStoreName)) { + db.createObjectStore(this.infoItemsStoreName, { keyPath: "storageId" }); + } + } + + async getAll(storeName: string): Promise { + return this.performTransaction(storeName, "readonly", (store) => + store.getAll(), + ); + } + + async put(storeName: string, item: T): Promise { + await this.performTransaction(storeName, "readwrite", (store) => + store.put(item), + ); + } + + async get(storeName: string, key: IDBValidKey): Promise { + return this.performTransaction(storeName, "readonly", (store) => + store.get(key), + ); + } + + async delete(storeName: string, key: IDBValidKey): Promise { + await this.performTransaction(storeName, "readwrite", (store) => + store.delete(key), + ); + } + + private async performTransaction( + storeName: string, + mode: IDBTransactionMode, + operation: (store: IDBObjectStore) => IDBRequest, + ): Promise { + return new Promise((resolve, reject) => { + if (!this.db) throw new Error("Database not initialized"); + + const transaction = this.db.transaction(storeName, mode); + const store = transaction.objectStore(storeName); + const request = operation(store); + + request.onerror = () => reject(new Error("IndexedDB operation failed")); + + request.onsuccess = () => { + const result = request.result as T; + resolve(result); + }; + }); + } + + closeDatabase(): void { + if (!this.db) return; + this.db.close(); + this.db = null; + } + + async deleteDatabase(): Promise { + this.closeDatabase(); + return new Promise((resolve, reject) => { + const request = indexedDB.deleteDatabase(this.dbName); + request.onsuccess = () => resolve(); + request.onerror = () => reject(new Error("Failed to delete database.")); + }); + } +}