From bdc121de0a05aaa4716269e2a96b3c4ae3385d8e Mon Sep 17 00:00:00 2001 From: Matt DeBoard Date: Thu, 21 Mar 2024 09:27:49 -0500 Subject: [PATCH] Add create & delete commands for R2 event notifications (#5294) * Add command for adding event notification configs * Add command for deleting event-notification cfgs * Add changeset for event notification commands * Expose event-types instead of actions. For beta of this feature, we'll only expose the categories of actions we want to support. * Add test coverage for eventNotificationHeaders fn * Simplify eventNotificationHeaders unit test Co-authored-by: Pete Bacon Darwin * Fix typo in argument description * Update event notif. request body EWC no longer supports including `bucketName` and `queue` in the request body, since these values are already present in the URL. --------- Co-authored-by: Pete Bacon Darwin --- .changeset/breezy-berries-play.md | 7 + packages/wrangler/src/__tests__/r2.test.ts | 160 +++++++++++++++++- .../wrangler/src/__tests__/r2/helpers.test.ts | 24 +++ packages/wrangler/src/r2/helpers.ts | 104 ++++++++++++ packages/wrangler/src/r2/index.ts | 111 +++++++++++- 5 files changed, 400 insertions(+), 6 deletions(-) create mode 100644 .changeset/breezy-berries-play.md create mode 100644 packages/wrangler/src/__tests__/r2/helpers.test.ts diff --git a/.changeset/breezy-berries-play.md b/.changeset/breezy-berries-play.md new file mode 100644 index 000000000000..5fd6f64f6572 --- /dev/null +++ b/.changeset/breezy-berries-play.md @@ -0,0 +1,7 @@ +--- +"wrangler": minor +--- + +Add `event-notification` commands in support of event notifications for Cloudflare R2. + +Included are commands for creating and deleting event notification configurations for individual buckets. diff --git a/packages/wrangler/src/__tests__/r2.test.ts b/packages/wrangler/src/__tests__/r2.test.ts index 6190695c27c7..ec5aca4f6e7c 100644 --- a/packages/wrangler/src/__tests__/r2.test.ts +++ b/packages/wrangler/src/__tests__/r2.test.ts @@ -2,13 +2,19 @@ import * as fs from "node:fs"; import { rest } from "msw"; import prettyBytes from "pretty-bytes"; import { MAX_UPLOAD_SIZE } from "../r2/constants"; +import { actionsForEventCategories } from "../r2/helpers"; import { mockAccountId, mockApiToken } from "./helpers/mock-account-id"; import { mockConsoleMethods } from "./helpers/mock-console"; import { useMockIsTTY } from "./helpers/mock-istty"; import { createFetchResult, msw, mswSuccessR2handlers } from "./helpers/msw"; import { runInTempDir } from "./helpers/run-in-tmp"; import { runWrangler } from "./helpers/run-wrangler"; -import type { R2BucketInfo } from "../r2/helpers"; +import type { + EWCRequestBody, + R2BucketInfo, + R2EventableOperation, + R2EventType, +} from "../r2/helpers"; describe("r2", () => { const std = mockConsoleMethods(); @@ -36,10 +42,11 @@ describe("r2", () => { Manage R2 buckets Commands: - wrangler r2 bucket create Create a new R2 bucket - wrangler r2 bucket list List R2 buckets - wrangler r2 bucket delete Delete an R2 bucket - wrangler r2 bucket sippy Manage Sippy incremental migration on an R2 bucket + wrangler r2 bucket create Create a new R2 bucket + wrangler r2 bucket list List R2 buckets + wrangler r2 bucket delete Delete an R2 bucket + wrangler r2 bucket sippy Manage Sippy incremental migration on an R2 bucket + wrangler r2 bucket event-notification Manage event notifications for an R2 bucket Flags: -j, --experimental-json-config Experimental: Support wrangler.json [boolean] @@ -537,6 +544,149 @@ describe("r2", () => { ); }); }); + + describe("event-notification", () => { + describe("create", () => { + it("follows happy path as expected", async () => { + const eventTypes: R2EventType[] = ["object_create", "object_delete"]; + const actions: R2EventableOperation[] = []; + const bucketName = "my-bucket"; + const queue = "deadbeef-0123-4567-8910-abcdefabcdef"; + + const config: EWCRequestBody = { + rules: [ + { + actions: eventTypes.reduce( + (acc, et) => acc.concat(actionsForEventCategories[et]), + actions + ), + }, + ], + }; + msw.use( + rest.put( + "*/accounts/:accountId/event_notifications/r2/:bucketName/configuration/queues/:queueUUID", + async (request, response, context) => { + const { accountId } = request.params; + expect(accountId).toEqual("some-account-id"); + expect(await request.json()).toEqual({ + ...config, + // We fill in `prefix` & `suffix` with empty strings if not + // provided + rules: [{ ...config.rules[0], prefix: "", suffix: "" }], + }); + expect(request.headers.get("authorization")).toEqual( + "Bearer some-api-token" + ); + return response.once(context.json(createFetchResult({}))); + } + ) + ); + await expect( + runWrangler( + `r2 bucket event-notification create ${bucketName} --queue ${queue} --event-types ${eventTypes.join( + " " + )}` + ) + ).resolves.toBe(undefined); + expect(std.out).toMatchInlineSnapshot(` + "Sending this configuration to \\"my-bucket\\": + {\\"rules\\":[{\\"prefix\\":\\"\\",\\"suffix\\":\\"\\",\\"actions\\":[\\"PutObject\\",\\"CompleteMultipartUpload\\",\\"CopyObject\\",\\"DeleteObject\\",\\"LifecycleDeletion\\"]}]} + Configuration created successfully!" + `); + }); + + it("errors if required options are not provided", async () => { + await expect( + runWrangler( + "r2 bucket event-notification create event-notification-test-001" + ) + ).rejects.toMatchInlineSnapshot( + `[Error: Missing required arguments: event-types, queue]` + ); + expect(std.out).toMatchInlineSnapshot(` + " + wrangler r2 bucket event-notification create + + Create new event notification configuration for an R2 bucket + + Positionals: + bucket The name of the bucket for which notifications will be emitted [string] [required] + + Flags: + -j, --experimental-json-config Experimental: Support wrangler.json [boolean] + -c, --config Path to .toml configuration file [string] + -e, --env Environment to use for operations and .env files [string] + -h, --help Show help [boolean] + -v, --version Show version number [boolean] + + Options: + --event-types, --event-type Specify the kinds of object events to emit notifications for. ex. '--event-types object_create object_delete' [array] [required] [choices: \\"object_create\\", \\"object_delete\\"] + --prefix only actions on objects with this prefix will emit notifications [string] + --suffix only actions on objects with this suffix will emit notifications [string] + --queue The ID of the queue to which event notifications will be sent. ex '--queue deadbeef-0123-4567-8910-abcdefgabcde' [string] [required]" + `); + }); + }); + + describe("delete", () => { + it("follows happy path as expected", async () => { + const bucketName = "my-bucket"; + const queue = "deadbeef-0123-4567-8910-abcdefabcdef"; + msw.use( + rest.delete( + "*/accounts/:accountId/event_notifications/r2/:bucketName/configuration/queues/:queueUUID", + async (request, response, context) => { + const { accountId } = request.params; + expect(accountId).toEqual("some-account-id"); + expect(request.headers.get("authorization")).toEqual( + "Bearer some-api-token" + ); + return response.once(context.json(createFetchResult({}))); + } + ) + ); + await expect( + runWrangler( + `r2 bucket event-notification delete ${bucketName} --queue ${queue}` + ) + ).resolves.toBe(undefined); + expect(std.out).toMatchInlineSnapshot(` + "Disabling event notifications for \\"my-bucket\\" to queue deadbeef-0123-4567-8910-abcdefabcdef... + Configuration deleted successfully!" + `); + }); + + it("errors if required options are not provided", async () => { + await expect( + runWrangler( + "r2 bucket event-notification delete event-notification-test-001" + ) + ).rejects.toMatchInlineSnapshot( + `[Error: Missing required argument: queue]` + ); + expect(std.out).toMatchInlineSnapshot(` + " + wrangler r2 bucket event-notification delete + + Delete event notification configuration for an R2 bucket and queue + + Positionals: + bucket The name of the bucket for which notifications will be emitted [string] [required] + + Flags: + -j, --experimental-json-config Experimental: Support wrangler.json [boolean] + -c, --config Path to .toml configuration file [string] + -e, --env Environment to use for operations and .env files [string] + -h, --help Show help [boolean] + -v, --version Show version number [boolean] + + Options: + --queue The ID of the queue that is configured to receive notifications. ex '--queue deadbeef-0123-4567-8910-abcdefgabcde' [string] [required]" + `); + }); + }); + }); }); describe("r2 object", () => { diff --git a/packages/wrangler/src/__tests__/r2/helpers.test.ts b/packages/wrangler/src/__tests__/r2/helpers.test.ts new file mode 100644 index 000000000000..982b9aed115a --- /dev/null +++ b/packages/wrangler/src/__tests__/r2/helpers.test.ts @@ -0,0 +1,24 @@ +import { eventNotificationHeaders } from "../../r2/helpers"; +import type { ApiCredentials } from "../../user"; + +describe("event notifications", () => { + test("auth email eventNotificationHeaders", () => { + const creds: ApiCredentials = { + authEmail: "test@example.com", + authKey: "some-big-secret", + }; + const result = eventNotificationHeaders(creds); + expect(result).toMatchObject({ + "X-Auth-Key": creds.authKey, + "X-Auth-Email": creds.authEmail, + }); + }); + + test("API token eventNotificationHeaders", () => { + const creds: ApiCredentials = { apiToken: "some-api-token" }; + const result = eventNotificationHeaders(creds); + expect(result).toMatchObject({ + Authorization: `Bearer ${creds.apiToken}`, + }); + }); +}); diff --git a/packages/wrangler/src/r2/helpers.ts b/packages/wrangler/src/r2/helpers.ts index 4dba22ed215f..0182f7cb4db8 100644 --- a/packages/wrangler/src/r2/helpers.ts +++ b/packages/wrangler/src/r2/helpers.ts @@ -4,6 +4,8 @@ import { fetchR2Objects } from "../cfetch/internal"; import { getLocalPersistencePath } from "../dev/get-local-persistence-path"; import { buildPersistOptions } from "../dev/miniflare"; import { UserError } from "../errors"; +import { logger } from "../logger"; +import type { ApiCredentials } from "../user"; import type { R2Bucket } from "@cloudflare/workers-types/experimental"; import type { ReplaceWorkersTypes } from "miniflare"; import type { Readable } from "node:stream"; @@ -317,3 +319,105 @@ export async function putR2Sippy( { method: "PUT", body: JSON.stringify(params), headers } ); } + +export const R2EventableOperations = [ + "PutObject", + "DeleteObject", + "CompleteMultipartUpload", + "AbortMultipartUpload", + "CopyObject", + "LifecycleDeletion", +] as const; +export type R2EventableOperation = typeof R2EventableOperations[number]; + +export const actionsForEventCategories: Record< + "object_create" | "object_delete", + R2EventableOperation[] +> = { + object_create: ["PutObject", "CompleteMultipartUpload", "CopyObject"], + object_delete: ["DeleteObject", "LifecycleDeletion"], +}; + +export type R2EventType = keyof typeof actionsForEventCategories; +// This type captures the shape of the data expected by EWC API. +export type EWCRequestBody = { + // `jurisdiction` is included here for completeness, but until Queues + // supports jurisdictions, then this command will not send anything to do + // with jurisdictions. + jurisdiction?: string; + rules: Array<{ + prefix?: string; + suffix?: string; + actions: R2EventableOperation[]; + }>; +}; + +export function eventNotificationHeaders( + apiCredentials: ApiCredentials +): HeadersInit { + const headers: HeadersInit = { + "Content-Type": "application/json", + }; + + if ("apiToken" in apiCredentials) { + headers["Authorization"] = `Bearer ${apiCredentials.apiToken}`; + } else { + headers["X-Auth-Key"] = apiCredentials.authKey; + headers["X-Auth-Email"] = apiCredentials.authEmail; + } + return headers; +} +/** Construct & transmit notification configuration to EWC. + * + * On success, receive HTTP 200 response with a body like: + * { event_notification_detail_id: string } + * + * Possible status codes on failure: + * - 400 Bad Request - Either: + * - Uploaded configuration is invalid + * - Communication with either R2-gateway-worker or queue-broker-worker fails + * - 409 Conflict - A configuration between the bucket and queue already exists + * */ +export async function putEventNotificationConfig( + apiCredentials: ApiCredentials, + accountId: string, + bucketName: string, + queueUUID: string, + eventTypes: R2EventType[], + prefix?: string, + suffix?: string +): Promise<{ event_notification_detail_id: string }> { + const headers = eventNotificationHeaders(apiCredentials); + let actions: R2EventableOperation[] = []; + + for (const et of eventTypes) { + actions = actions.concat(actionsForEventCategories[et]); + } + + const body: EWCRequestBody = { + rules: [{ prefix, suffix, actions }], + }; + logger.log( + `Sending this configuration to "${bucketName}":\n${JSON.stringify(body)}` + ); + return await fetchResult<{ event_notification_detail_id: string }>( + `/accounts/${accountId}/event_notifications/r2/${bucketName}/configuration/queues/${queueUUID}`, + { method: "PUT", body: JSON.stringify(body), headers } + ); +} + +export async function deleteEventNotificationConfig( + apiCredentials: ApiCredentials, + accountId: string, + bucketName: string, + queueUUID: string +): Promise { + const headers = eventNotificationHeaders(apiCredentials); + logger.log( + `Disabling event notifications for "${bucketName}" to queue ${queueUUID}...` + ); + return await fetchResult( + `/accounts/${accountId}/event_notifications/r2/${bucketName}/configuration/queues/${queueUUID}`, + { method: "DELETE", headers } + ); +} diff --git a/packages/wrangler/src/r2/index.ts b/packages/wrangler/src/r2/index.ts index c788c8904ac3..d658f359e127 100644 --- a/packages/wrangler/src/r2/index.ts +++ b/packages/wrangler/src/r2/index.ts @@ -9,20 +9,24 @@ import { FatalError, UserError } from "../errors"; import { CommandLineArgsError, printWranglerBanner } from "../index"; import { logger } from "../logger"; import * as metrics from "../metrics"; -import { requireAuth } from "../user"; +import { requireApiToken, requireAuth } from "../user"; import { MAX_UPLOAD_SIZE } from "./constants"; import { + actionsForEventCategories, bucketAndKeyFromObjectPath, createR2Bucket, + deleteEventNotificationConfig, deleteR2Bucket, deleteR2Object, getR2Object, listR2Buckets, + putEventNotificationConfig, putR2Object, usingLocalBucket, } from "./helpers"; import * as Sippy from "./sippy"; import type { CommonYargsArgv } from "../yargs-types"; +import type { R2EventType } from "./helpers"; import type { R2PutOptions } from "@cloudflare/workers-types/experimental"; const CHUNK_SIZE = 1024; @@ -539,6 +543,111 @@ export function r2(r2Yargs: CommonYargsArgv) { ); } ); + + r2BucketYargs.command( + "event-notification", + "Manage event notifications for an R2 bucket", + (r2EvNotifyYargs) => { + return r2EvNotifyYargs + .command( + "create ", + "Create new event notification configuration for an R2 bucket", + (yargs) => { + return yargs + .positional("bucket", { + describe: + "The name of the bucket for which notifications will be emitted", + type: "string", + demandOption: true, + }) + .option("event-types", { + describe: + "Specify the kinds of object events to emit notifications for. ex. '--event-types object_create object_delete'", + alias: "event-type", + choices: Object.keys(actionsForEventCategories), + demandOption: true, + requiresArg: true, + type: "array", + }) + .option("prefix", { + describe: + "only actions on objects with this prefix will emit notifications", + requiresArg: false, + type: "string", + }) + .option("suffix", { + describe: + "only actions on objects with this suffix will emit notifications", + type: "string", + }) + .option("queue", { + describe: + "The ID of the queue to which event notifications will be sent. ex '--queue deadbeef-0123-4567-8910-abcdefgabcde'", + demandOption: true, + requiresArg: true, + type: "string", + }); + }, + async (args) => { + await printWranglerBanner(); + const config = readConfig(args.config, args); + const accountId = await requireAuth(config); + const apiCreds = requireApiToken(); + const { + bucket, + queue, + eventTypes, + prefix = "", + suffix = "", + } = args; + await putEventNotificationConfig( + apiCreds, + accountId, + `${bucket}`, + `${queue}`, + eventTypes as R2EventType[], + `${prefix}`, + `${suffix}` + ); + logger.log("Configuration created successfully!"); + } + ) + .command( + "delete ", + "Delete event notification configuration for an R2 bucket and queue", + (yargs) => { + return yargs + .positional("bucket", { + describe: + "The name of the bucket for which notifications will be emitted", + type: "string", + demandOption: true, + }) + .option("queue", { + describe: + "The ID of the queue that is configured to receive notifications. ex '--queue deadbeef-0123-4567-8910-abcdefgabcde'", + demandOption: true, + requiresArg: true, + type: "string", + }); + }, + async (args) => { + await printWranglerBanner(); + const config = readConfig(args.config, args); + const accountId = await requireAuth(config); + const apiCreds = requireApiToken(); + const { bucket, queue } = args; + await deleteEventNotificationConfig( + apiCreds, + accountId, + `${bucket}`, + `${queue}` + ); + logger.log("Configuration deleted successfully!"); + } + ); + } + ); return r2BucketYargs; }); }