Skip to content

Commit

Permalink
Apply new DFS logic to content fragment + refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
flvndvd committed Apr 4, 2024
1 parent a86e7f9 commit dfd21f3
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 88 deletions.
6 changes: 4 additions & 2 deletions front/lib/api/assistant/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { agentConfigurationWasUpdatedBy } from "@app/lib/api/assistant/recent_au
import { agentUserListStatus } from "@app/lib/api/assistant/user_relation";
import { compareAgentsForSort } from "@app/lib/assistant";
import type { Authenticator } from "@app/lib/auth";
import { getFileContentType } from "@app/lib/dfs";
import { getPublicUploadBucket } from "@app/lib/dfs";
import {
AgentConfiguration,
AgentDataSourceConfiguration,
Expand Down Expand Up @@ -761,7 +761,9 @@ async function isSelfHostedImageWithValidContentType(pictureUrl: string) {
return false;
}

const contentType = await getFileContentType("PUBLIC_UPLOAD", filename);
const contentType = await getPublicUploadBucket().getFileContentType(
filename
);
if (!contentType) {
return false;
}
Expand Down
5 changes: 4 additions & 1 deletion front/lib/dfs/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ const config = {
getServiceAccount: (): string => {
return EnvironmentConfig.getEnvVariable("SERVICE_ACCOUNT");
},
getPublicUploadBucket: (): string => {
getGcsPublicUploadBucket: (): string => {
return EnvironmentConfig.getEnvVariable("DUST_UPLOAD_BUCKET");
},
getGcsPrivateUploadsBucket: (): string => {
return EnvironmentConfig.getEnvVariable("DUST_PRIVATE_UPLOADS_BUCKET");
},
};

export default config;
106 changes: 73 additions & 33 deletions front/lib/dfs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,91 @@ import type { Bucket } from "@google-cloud/storage";
import { Storage } from "@google-cloud/storage";
import type formidable from "formidable";
import fs from "fs";
import { pipeline } from "stream/promises";

import config from "@app/lib/dfs/config";

type SupportedBucketKeyType = "PUBLIC_UPLOAD";
type BucketKeyType = "PRIVATE_UPLOAD" | "PUBLIC_UPLOAD";

const storage = new Storage({
keyFilename: config.getServiceAccount(),
});

const bucketKeysToBucket: Record<SupportedBucketKeyType, Bucket> = {
PUBLIC_UPLOAD: storage.bucket(config.getPublicUploadBucket()),
const bucketKeysToBucket: Record<BucketKeyType, Bucket> = {
PRIVATE_UPLOAD: storage.bucket(config.getGcsPrivateUploadsBucket()),
PUBLIC_UPLOAD: storage.bucket(config.getGcsPublicUploadBucket()),
};

export async function uploadToBucket(
bucketKey: SupportedBucketKeyType,
file: formidable.File
) {
const bucket = bucketKeysToBucket[bucketKey];

const gcsFile = bucket.file(file.newFilename);
const fileStream = fs.createReadStream(file.filepath);

return new Promise((resolve, reject) =>
fileStream
.pipe(
gcsFile.createWriteStream({
metadata: {
contentType: file.mimetype,
},
})
)
.on("error", reject)
.on("finish", () => resolve(gcsFile))
);
}
class DFS {
private readonly bucket: Bucket;

constructor(bucketKey: BucketKeyType) {
this.bucket = bucketKeysToBucket[bucketKey];
}

/**
* Upload functions.
*/

async uploadFileToBucket(file: formidable.File, destPath: string) {
const gcsFile = this.file(destPath);
const fileStream = fs.createReadStream(file.filepath);

await pipeline(
fileStream,
gcsFile.createWriteStream({
metadata: {
contentType: file.mimetype,
},
})
);
}

async uploadRawContentToBucket({
content,
contentType,
filePath,
}: {
content: string;
contentType: string;
filePath: string;
}) {
const gcsFile = this.file(filePath);

await gcsFile.save(content, {
contentType,
});
}

export async function getFileContentType(
bucketKey: SupportedBucketKeyType,
filename: string
): Promise<string | null> {
const bucket = bucketKeysToBucket[bucketKey];
/**
* Download functions.
*/

const gcsFile = bucket.file(filename);
async fetchFileContent(filePath: string) {
const gcsFile = this.file(filePath);

const [metadata] = await gcsFile.getMetadata();
const [content] = await gcsFile.download();

return metadata.contentType;
return content.toString();
}

async getFileContentType(filename: string): Promise<string | null> {
const gcsFile = this.file(filename);

const [metadata] = await gcsFile.getMetadata();

return metadata.contentType;
}

file(filename: string) {
return this.bucket.file(filename);
}

get name() {
return this.bucket.name;
}
}

export const getPrivateUploadBucket = () => new DFS("PRIVATE_UPLOAD");

export const getPublicUploadBucket = () => new DFS("PUBLIC_UPLOAD");
27 changes: 8 additions & 19 deletions front/lib/resources/content_fragment_resource.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { ContentFragmentType, ModelId, Result } from "@dust-tt/types";
import { Err, Ok } from "@dust-tt/types";
import { Storage } from "@google-cloud/storage";
import type {
Attributes,
CreationAttributes,
Expand All @@ -9,12 +8,14 @@ import type {
} from "sequelize";

import appConfig from "@app/lib/api/config";
import { getPrivateUploadBucket } from "@app/lib/dfs";
import { Message } from "@app/lib/models";
import { BaseResource } from "@app/lib/resources/base_resource";
import { gcsConfig } from "@app/lib/resources/storage/config";
import { ContentFragmentModel } from "@app/lib/resources/storage/models/content_fragment";
import type { ReadonlyAttributesType } from "@app/lib/resources/storage/types";

const privateUploadGcs = getPrivateUploadBucket();

// Attributes are marked as read-only to reflect the stateless nature of our Resource.
// This design will be moved up to BaseResource once we transition away from Sequelize.
// eslint-disable-next-line @typescript-eslint/no-empty-interface
Expand Down Expand Up @@ -143,7 +144,7 @@ export function fileAttachmentLocation({
const filePath = `content_fragments/w/${workspaceId}/assistant/conversations/${conversationId}/content_fragment/${messageId}/${contentFormat}`;
return {
filePath,
internalUrl: `https://storage.googleapis.com/${gcsConfig.getGcsPrivateUploadsBucket()}/${filePath}`,
internalUrl: `https://storage.googleapis.com/${privateUploadGcs.name}/${filePath}`,
downloadUrl: `${appConfig.getAppUrl()}/api/w/${workspaceId}/assistant/conversations/${conversationId}/messages/${messageId}/raw_content_fragment`,
};
}
Expand All @@ -169,15 +170,11 @@ export async function storeContentFragmentText({
messageId,
contentFormat: "text",
});
const storage = new Storage({
keyFilename: appConfig.getServiceAccount(),
});

const bucket = storage.bucket(gcsConfig.getGcsPrivateUploadsBucket());
const gcsFile = bucket.file(filePath);

await gcsFile.save(content, {
await privateUploadGcs.uploadRawContentToBucket({
content,
contentType: "text/plain",
filePath,
});

return Buffer.byteLength(content);
Expand All @@ -192,20 +189,12 @@ export async function getContentFragmentText({
conversationId: string;
messageId: string;
}): Promise<string> {
const storage = new Storage({
keyFilename: appConfig.getServiceAccount(),
});

const { filePath } = fileAttachmentLocation({
workspaceId,
conversationId,
messageId,
contentFormat: "text",
});

const bucket = storage.bucket(gcsConfig.getGcsPrivateUploadsBucket());
const gcsFile = bucket.file(filePath);

const [content] = await gcsFile.download();
return content.toString();
return privateUploadGcs.fetchFileContent(filePath);
}
6 changes: 0 additions & 6 deletions front/lib/resources/storage/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,3 @@ export const dbConfig = {
return EnvironmentConfig.getEnvVariable("FRONT_DATABASE_URI");
},
};

export const gcsConfig = {
getGcsPrivateUploadsBucket: (): string => {
return EnvironmentConfig.getEnvVariable("DUST_PRIVATE_UPLOADS_BUCKET");
},
};
10 changes: 5 additions & 5 deletions front/pages/api/w/[wId]/assistant/agent_configurations/avatar.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { IncomingForm } from "formidable";
import type { NextApiRequest, NextApiResponse } from "next";

import { uploadToBucket } from "@app/lib/dfs";
import { getPublicUploadBucket } from "@app/lib/dfs";
import { withLogging } from "@app/logger/withlogging";

const { DUST_UPLOAD_BUCKET = "" } = process.env;

export const config = {
api: {
bodyParser: false, // Disabling Next.js's body parser as formidable has its own
},
};

const publicUploadGcs = getPublicUploadBucket();

async function handler(
req: NextApiRequest,
res: NextApiResponse
Expand Down Expand Up @@ -41,9 +41,9 @@ async function handler(

const [file] = maybeFiles;

await uploadToBucket("PUBLIC_UPLOAD", file);
await publicUploadGcs.uploadFileToBucket(file, file.newFilename);

const fileUrl = `https://storage.googleapis.com/${DUST_UPLOAD_BUCKET}/${file.newFilename}`;
const fileUrl = `https://storage.googleapis.com/${publicUploadGcs.name}/${file.newFilename}`;

res.status(200).json({ fileUrl });
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import type { WithAPIErrorReponse } from "@dust-tt/types";
import { isContentFragmentType } from "@dust-tt/types";
import { Storage } from "@google-cloud/storage";
import { IncomingForm } from "formidable";
import fs from "fs";
import type { NextApiRequest, NextApiResponse } from "next";
import { pipeline } from "stream/promises";

import { getConversation } from "@app/lib/api/assistant/conversation";
import appConfig from "@app/lib/api/config";
import { Authenticator, getSession } from "@app/lib/auth";
import { getPrivateUploadBucket } from "@app/lib/dfs";
import { fileAttachmentLocation } from "@app/lib/resources/content_fragment_resource";
import { gcsConfig } from "@app/lib/resources/storage/config";
import { apiError, withLogging } from "@app/logger/withlogging";

export const config = {
Expand All @@ -19,6 +15,8 @@ export const config = {
},
};

const privateUploadGcs = getPrivateUploadBucket();

async function handler(
req: NextApiRequest,
res: NextApiResponse<WithAPIErrorReponse<{ sourceUrl: string }>>
Expand Down Expand Up @@ -112,16 +110,10 @@ async function handler(
contentFormat: "raw",
});

const storage = new Storage({
keyFilename: appConfig.getServiceAccount(),
});
const bucket = storage.bucket(gcsConfig.getGcsPrivateUploadsBucket());
const gcsFile = bucket.file(filePath);

switch (req.method) {
case "GET":
// redirect to a signed URL
const [url] = await gcsFile.getSignedUrl({
const [url] = await privateUploadGcs.file(filePath).getSignedUrl({
version: "v4",
action: "read",
// since we redirect, the use is immediate so expiry can be short
Expand Down Expand Up @@ -150,16 +142,7 @@ async function handler(

const [file] = maybeFiles;

const fileStream = fs.createReadStream(file.filepath);

await pipeline(
fileStream,
gcsFile.createWriteStream({
metadata: {
contentType: file.mimetype,
},
})
);
await privateUploadGcs.uploadFileToBucket(file, filePath);

res.status(200).json({ sourceUrl: downloadUrl });
return;
Expand Down

0 comments on commit dfd21f3

Please sign in to comment.