Skip to content

Commit

Permalink
Write cache large blobs in chunks to avoid write file limits in Node (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
JakeLane authored Nov 6, 2023
1 parent eeb3ba3 commit c54ed43
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 22 deletions.
44 changes: 40 additions & 4 deletions packages/core/cache/src/FSCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {serialize, deserialize, registerSerializableClass} from '@parcel/core';
// flowlint-next-line untyped-import:off
import packageJson from '../package.json';

import {WRITE_LIMIT_CHUNK} from './constants';

const pipeline: (Readable, Writable) => Promise<void> = promisify(
stream.pipeline,
);
Expand Down Expand Up @@ -81,16 +83,50 @@ export class FSCache implements Cache {
}
}

#getFilePath(key: string, index: number): string {
return path.join(this.dir, `${key}-${index}`);
}

hasLargeBlob(key: string): Promise<boolean> {
return this.fs.exists(this._getCachePath(`${key}-large`));
return this.fs.exists(this.#getFilePath(key, 0));
}

getLargeBlob(key: string): Promise<Buffer> {
return this.fs.readFile(this._getCachePath(`${key}-large`));
async getLargeBlob(key: string): Promise<Buffer> {
const buffers: Promise<Buffer>[] = [];
for (let i = 0; await this.fs.exists(this.#getFilePath(key, i)); i += 1) {
const file: Promise<Buffer> = this.fs.readFile(this.#getFilePath(key, i));

buffers.push(file);
}

return Buffer.concat(await Promise.all(buffers));
}

async setLargeBlob(key: string, contents: Buffer | string): Promise<void> {
await this.fs.writeFile(this._getCachePath(`${key}-large`), contents);
const chunks = Math.ceil(contents.length / WRITE_LIMIT_CHUNK);

if (chunks === 1) {
// If there's one chunk, don't slice the content
await this.fs.writeFile(this.#getFilePath(key, 0), contents);
return;
}

const writePromises: Promise<void>[] = [];
for (let i = 0; i < chunks; i += 1) {
writePromises.push(
this.fs.writeFile(
this.#getFilePath(key, i),
typeof contents === 'string'
? contents.slice(i * WRITE_LIMIT_CHUNK, (i + 1) * WRITE_LIMIT_CHUNK)
: contents.subarray(
i * WRITE_LIMIT_CHUNK,
(i + 1) * WRITE_LIMIT_CHUNK,
),
),
);
}

await Promise.all(writePromises);
}

async get<T>(key: string): Promise<?T> {
Expand Down
43 changes: 39 additions & 4 deletions packages/core/cache/src/LMDBCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {NodeFS} from '@parcel/fs';
import packageJson from '../package.json';
// $FlowFixMe
import lmdb from 'lmdb';
import {WRITE_LIMIT_CHUNK} from './constants';

const pipeline: (Readable, Writable) => Promise<void> = promisify(
stream.pipeline,
Expand Down Expand Up @@ -91,16 +92,50 @@ export class LMDBCache implements Cache {
return Promise.resolve(this.store.get(key));
}

#getFilePath(key: string, index: number): string {
return path.join(this.dir, `${key}-${index}`);
}

hasLargeBlob(key: string): Promise<boolean> {
return this.fs.exists(path.join(this.dir, key));
return this.fs.exists(this.#getFilePath(key, 0));
}

getLargeBlob(key: string): Promise<Buffer> {
return this.fs.readFile(path.join(this.dir, key));
async getLargeBlob(key: string): Promise<Buffer> {
const buffers: Promise<Buffer>[] = [];
for (let i = 0; await this.fs.exists(this.#getFilePath(key, i)); i += 1) {
const file: Promise<Buffer> = this.fs.readFile(this.#getFilePath(key, i));

buffers.push(file);
}

return Buffer.concat(await Promise.all(buffers));
}

async setLargeBlob(key: string, contents: Buffer | string): Promise<void> {
await this.fs.writeFile(path.join(this.dir, key), contents);
const chunks = Math.ceil(contents.length / WRITE_LIMIT_CHUNK);

if (chunks === 1) {
// If there's one chunk, don't slice the content
await this.fs.writeFile(this.#getFilePath(key, 0), contents);
return;
}

const writePromises: Promise<void>[] = [];
for (let i = 0; i < chunks; i += 1) {
writePromises.push(
this.fs.writeFile(
this.#getFilePath(key, i),
typeof contents === 'string'
? contents.slice(i * WRITE_LIMIT_CHUNK, (i + 1) * WRITE_LIMIT_CHUNK)
: contents.subarray(
i * WRITE_LIMIT_CHUNK,
(i + 1) * WRITE_LIMIT_CHUNK,
),
),
);
}

await Promise.all(writePromises);
}

refresh(): void {
Expand Down
4 changes: 4 additions & 0 deletions packages/core/cache/src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// @flow strict-local

// Node has a file size limit of 2 GB
export const WRITE_LIMIT_CHUNK = 2 * 1024 ** 3;
4 changes: 2 additions & 2 deletions packages/dev/bundle-stats-cli/src/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import {getBundleStats} from '@parcel/reporter-bundle-stats/src/BundleStatsRepor
import {PackagedBundle as PackagedBundleClass} from '@parcel/core/src/public/Bundle';
import type {commander$Command} from 'commander';

function run({cacheDir, outDir}) {
async function run({cacheDir, outDir}) {
// 1. load bundle graph and info via parcel~query
let {bundleGraph, bundleInfo} = loadGraphs(cacheDir);
let {bundleGraph, bundleInfo} = await loadGraphs(cacheDir);

if (bundleGraph == null) {
console.error('Bundle Graph could not be found');
Expand Down
1 change: 1 addition & 0 deletions packages/dev/query/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"dependencies": {
"@parcel/core": "2.10.2",
"@parcel/graph": "3.0.2",
"@parcel/cache": "2.10.2",
"nullthrows": "^1.1.1",
"table": "^6.8.1",
"v8-compile-cache": "^2.0.0"
Expand Down
7 changes: 4 additions & 3 deletions packages/dev/query/src/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {Priority} from '@parcel/core/src/types';

import {loadGraphs} from './index.js';

export function run(input: string[]) {
export async function run(input: string[]) {
let args = input;
let cacheDir = path.join(process.cwd(), '.parcel-cache');
if (args[0] === '--cache') {
Expand All @@ -37,8 +37,9 @@ export function run(input: string[]) {
}

console.log('Loading graphs...');
let {assetGraph, bundleGraph, bundleInfo, requestTracker} =
loadGraphs(cacheDir);
let {assetGraph, bundleGraph, bundleInfo, requestTracker} = await loadGraphs(
cacheDir,
);

if (requestTracker == null) {
console.error('Request Graph could not be found');
Expand Down
22 changes: 13 additions & 9 deletions packages/dev/query/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import path from 'path';
import v8 from 'v8';
import nullthrows from 'nullthrows';
import invariant from 'assert';
import {LMDBCache} from '@parcel/cache/src/LMDBCache';

const {
AssetGraph,
Expand All @@ -19,12 +20,12 @@ const {
},
} = require('./deep-imports.js');

export function loadGraphs(cacheDir: string): {|
export async function loadGraphs(cacheDir: string): Promise<{|
assetGraph: ?AssetGraph,
bundleGraph: ?BundleGraph,
requestTracker: ?RequestTracker,
bundleInfo: ?Map<ContentKey, PackagedBundleInfo>,
|} {
|}> {
function filesBySizeAndModifiedTime() {
let files = fs.readdirSync(cacheDir).map(f => {
let stat = fs.statSync(path.join(cacheDir, f));
Expand All @@ -38,11 +39,14 @@ export function loadGraphs(cacheDir: string): {|
}

let requestTracker;
const cache = new LMDBCache(cacheDir);
for (let f of filesBySizeAndModifiedTime()) {
// if (bundleGraph && assetGraph && requestTracker) break;
if (path.extname(f) !== '') continue;
// Empty filename or not the first chunk
if (path.extname(f) !== '' && !f.endsWith('-0')) continue;
try {
let obj = v8.deserialize(fs.readFileSync(f));
let obj = v8.deserialize(
await cache.getLargeBlob(path.basename(f).slice(0, -'-0'.length)),
);
/* if (obj.assetGraph != null && obj.assetGraph.value.hash != null) {
assetGraph = AssetGraph.deserialize(obj.assetGraph.value);
} else if (obj.bundleGraph != null) {
Expand Down Expand Up @@ -90,7 +94,7 @@ export function loadGraphs(cacheDir: string): {|
);
if (bundleGraphRequestNode != null) {
bundleGraph = BundleGraph.deserialize(
loadLargeBlobRequestRequestSync(cacheDir, bundleGraphRequestNode)
(await loadLargeBlobRequestRequest(cache, bundleGraphRequestNode))
.bundleGraph.value,
);

Expand All @@ -99,7 +103,7 @@ export function loadGraphs(cacheDir: string): {|
).find(n => n.type === 'request' && n.value.type === 'asset_graph_request');
if (assetGraphRequest != null) {
assetGraph = AssetGraph.deserialize(
loadLargeBlobRequestRequestSync(cacheDir, assetGraphRequest).assetGraph
(await loadLargeBlobRequestRequest(cache, assetGraphRequest)).assetGraph
.value,
);
}
Expand All @@ -120,9 +124,9 @@ export function loadGraphs(cacheDir: string): {|
return {assetGraph, bundleGraph, requestTracker, bundleInfo};
}

function loadLargeBlobRequestRequestSync(cacheDir, node) {
async function loadLargeBlobRequestRequest(cache, node) {
invariant(node.type === 'request');
return v8.deserialize(
fs.readFileSync(path.join(cacheDir, nullthrows(node.value.resultCacheKey))),
await cache.getLargeBlob(nullthrows(node.value.resultCacheKey)),
);
}

0 comments on commit c54ed43

Please sign in to comment.