Skip to content

Commit

Permalink
Implement new partitioning logic and update checkpointing(#976)
Browse files Browse the repository at this point in the history
  • Loading branch information
logancyang authored Dec 27, 2024
1 parent 1840242 commit 94b3829
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 38 deletions.
147 changes: 112 additions & 35 deletions src/search/chunkedStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,37 +160,62 @@ export class ChunkedStorage {
),
};

const metadataPath = this.getMetadataPath();
await this.ensureDirectoryExists(metadataPath);
await this.app.vault.adapter.write(metadataPath, JSON.stringify(metadata));
await this.saveMetadata(metadata);
// Create global data object (excluding partitioned fields)
const globalData = {
...rawData,
docs: { docs: {}, count: 0 },
index: {
...(rawData as any).index,
vectorIndexes: undefined,
},
};

// Save partitions
for (const [partitionIndex, docs] of partitions.entries()) {
// Create partition-specific data
const partitionData = {
...rawData,
documents: docs,
index: {
vectorIndexes: {
embedding: {
size: (rawData as any).index.vectorIndexes.embedding.size,
vectors: Object.fromEntries(
Object.entries((rawData as any).index.vectorIndexes.embedding.vectors).filter(
([id]) => docs.some((doc) => doc.id === id)
)
),
},
},
},
docs: {
docs: Object.fromEntries(docs.map((doc, index) => [(index + 1).toString(), doc])),
count: docs.length,
},
};

// For first partition, include global data
const finalPartitionData =
partitionIndex === 0
? {
...globalData,
docs: partitionData.docs,
index: {
...globalData.index,
vectorIndexes: partitionData.index.vectorIndexes,
},
}
: partitionData;

const chunkPath = this.getChunkPath(partitionIndex);
await this.ensureDirectoryExists(chunkPath);
await this.app.vault.adapter.write(chunkPath, JSON.stringify(partitionData));
await this.app.vault.adapter.write(chunkPath, JSON.stringify(finalPartitionData));

if (getSettings().debug) {
console.log(`Saved partition ${partitionIndex + 1}/${numPartitions}`);
}
}

let savedTotal = 0;
for (let i = 0; i < numPartitions; i++) {
const chunkPath = this.getChunkPath(i);
const chunkData = JSON.parse(await this.app.vault.adapter.read(chunkPath));
savedTotal += chunkData.documents.length;
}

if (getSettings().debug) {
if (savedTotal !== rawDocs.length) {
console.error(
`Document count mismatch during save! Original: ${rawDocs.length}, Saved: ${savedTotal}`
);
}
console.log("Saved all partitions");
}
} catch (error) {
console.error(`Error saving database:`, error);
Expand Down Expand Up @@ -221,17 +246,8 @@ export class ChunkedStorage {
return newDb;
}

// Try loading chunked format
const metadataPath = this.getMetadataPath();
if (!(await this.app.vault.adapter.exists(metadataPath))) {
throw new CustomError("No existing database found");
}

const metadata: ChunkMetadata = JSON.parse(await this.app.vault.adapter.read(metadataPath));
if (!metadata?.schema) {
throw new CustomError("Invalid metadata file: missing schema");
}

// Load metadata
const metadata = await this.loadMetadata();
const newDb = await create({
schema: metadata.schema,
components: {
Expand All @@ -242,19 +258,58 @@ export class ChunkedStorage {
},
});

// Load and merge all partitions
let mergedData = null;
const allChunks = [];

// First, load all chunks
for (let i = 0; i < metadata.numPartitions; i++) {
const chunkPath = this.getChunkPath(i);
if (await this.app.vault.adapter.exists(chunkPath)) {
const chunkData = JSON.parse(await this.app.vault.adapter.read(chunkPath));
if (chunkData) {
await load(newDb, chunkData);
if (getSettings().debug) {
console.log(`Loaded partition ${i + 1}/${metadata.numPartitions}`);
}
allChunks.push(chunkData);

// First chunk contains global data
if (i === 0) {
mergedData = chunkData;
}
}
}

if (!mergedData) {
throw new CustomError("No data found in chunks");
}

// Create new docs object based on internalDocumentIDStore order
const orderedDocs: Record<string, any> = {};
let nextDocId = 1;

for (const internalId of mergedData.internalDocumentIDStore.internalIdToId) {
// Find document in any chunk
const doc = allChunks
.flatMap((chunk) => Object.values(chunk.docs.docs))
.find((doc: any) => (doc as any).id === internalId);

if (doc) {
orderedDocs[nextDocId.toString()] = doc;
nextDocId++;
} else if (getSettings().debug) {
console.warn(`Document ${internalId} not found in any chunk`);
}
}

// Replace docs with ordered version
mergedData.docs.docs = orderedDocs;
mergedData.docs.count = Object.keys(orderedDocs).length;

// Merge vectors from all chunks
mergedData.index.vectorIndexes.embedding.vectors = Object.assign(
{},
...allChunks.map((chunk) => chunk.index?.vectorIndexes?.embedding?.vectors || {})
);

// Load merged data into database
await load(newDb, mergedData);
return newDb;
} catch (error) {
console.error(`Error loading database:`, error);
Expand Down Expand Up @@ -298,4 +353,26 @@ export class ChunkedStorage {
(await this.app.vault.adapter.exists(legacyPath))
);
}

// Helper method to load metadata
private async loadMetadata(): Promise<ChunkMetadata> {
const metadataPath = this.getMetadataPath();
if (!(await this.app.vault.adapter.exists(metadataPath))) {
throw new CustomError("No existing database found");
}

const metadata: ChunkMetadata = JSON.parse(await this.app.vault.adapter.read(metadataPath));
if (!metadata?.schema) {
throw new CustomError("Invalid metadata file: missing schema");
}

return metadata;
}

// Helper method to save metadata
private async saveMetadata(metadata: ChunkMetadata): Promise<void> {
const metadataPath = this.getMetadataPath();
await this.ensureDirectoryExists(metadataPath);
await this.app.vault.adapter.write(metadataPath, JSON.stringify(metadata));
}
}
12 changes: 9 additions & 3 deletions src/search/indexOperations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import { App, Notice, TFile } from "obsidian";
import { DBOperations } from "./dbOperations";
import { extractAppIgnoreSettings, getFilePathsForQA } from "./searchUtils";

const EMBEDDING_BATCH_SIZE = 50;
const CHECKPOINT_INTERVAL = 4 * EMBEDDING_BATCH_SIZE;
const EMBEDDING_BATCH_SIZE = 64;
const CHECKPOINT_INTERVAL = 8 * EMBEDDING_BATCH_SIZE;

export interface IndexingState {
isIndexingPaused: boolean;
Expand Down Expand Up @@ -121,7 +121,13 @@ export class IndexOperations {
this.state.indexedCount = this.state.processedFiles.size;
this.updateIndexingNoticeMessage();

if (this.state.indexedCount % CHECKPOINT_INTERVAL === 0) {
// Calculate if we've crossed a checkpoint threshold
const previousCheckpoint = Math.floor(
(this.state.indexedCount - batch.length) / CHECKPOINT_INTERVAL
);
const currentCheckpoint = Math.floor(this.state.indexedCount / CHECKPOINT_INTERVAL);

if (currentCheckpoint > previousCheckpoint) {
await this.dbOps.saveDB();
console.log("Copilot index checkpoint save completed.");
}
Expand Down

0 comments on commit 94b3829

Please sign in to comment.