From f4da24f3f0806a09dd6c33a451435680698a9793 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1729@users.noreply.github.com> Date: Sat, 16 Dec 2023 17:16:27 +0100 Subject: [PATCH] feat: Add bulk page (Staging only) (#474) * Use generated types * WIP BULK * Fix timezone * use smtp * feat: Add bulk page (Staging only) --- package.json | 1 + src/app/api/v1/bulk/route.ts | 160 ++++++++++++++++++++ src/app/api/v1/bulk/webhook/route.ts | 57 +++++++ src/pages/bulk.tsx | 145 ++++++++++++++++++ supabase/migrations/20231212135226_bulk.sql | 34 +++++ yarn.lock | 16 +- 6 files changed, 412 insertions(+), 1 deletion(-) create mode 100644 src/app/api/v1/bulk/route.ts create mode 100644 src/app/api/v1/bulk/webhook/route.ts create mode 100644 src/pages/bulk.tsx create mode 100644 supabase/migrations/20231212135226_bulk.sql diff --git a/package.json b/package.json index 756cd546..eaa9e5fe 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "axios": "^1.6.2", "cors": "^2.8.5", "date-fns": "^2.30.0", + "encoding": "^0.1.13", "mailgun-js": "^0.22.0", "markdown-pdf": "^11.0.0", "mustache": "^4.2.0", diff --git a/src/app/api/v1/bulk/route.ts b/src/app/api/v1/bulk/route.ts new file mode 100644 index 00000000..3c382cd7 --- /dev/null +++ b/src/app/api/v1/bulk/route.ts @@ -0,0 +1,160 @@ +import { NextRequest } from "next/server"; +import amqplib from "amqplib"; +import { supabaseAdmin } from "@/util/supabaseServer"; +import { sentryException } from "@/util/sentry"; +import { getWebappURL } from "@/util/helpers"; +import { Tables } from "@/supabase/database.types"; + +interface BulkPayload { + input_type: "array"; + input: string[]; +} + +export const POST = async (req: NextRequest): Promise => { + // TODO Remove this once we allow Bulk. + if (process.env.VERCEL_ENV === "production") { + return Response.json( + { error: "Not available in production" }, + { status: 403 } + ); + } + + try { + const user = await getUser(req); + + const payload: BulkPayload = await req.json(); + + // Add to Supabase + const res1 = await supabaseAdmin + .from("bulk_jobs") + .insert({ + user_id: user.id, + payload, + }) + .select("*"); + if (res1.error) { + throw res1.error; + } + const bulkJob = res1.data[0]; + const res2 = await supabaseAdmin + .from("bulk_emails") + .insert( + payload.input.map((email) => ({ + bulk_job_id: bulkJob.id, + email, + })) + ) + .select("*"); + if (res2.error) { + throw res2.error; + } + + const conn = await amqplib + .connect(process.env.RCH_AMQP_ADDR || "amqp://localhost") + .catch((err) => { + const message = `Error connecting to RabbitMQ: ${ + (err as AggregateError).errors + ? (err as AggregateError).errors + .map((e) => e.message) + .join(", ") + : err.message + }`; + + throw new Error(message); + }); + + const ch1 = await conn.createChannel().catch((err) => { + throw new Error(`Error creating RabbitMQ channel: ${err.message}`); + }); + const queueName = `check_email.Smtp`; // TODO + await ch1.assertQueue(queueName, { + maxPriority: 5, + }); + + res2.data.forEach(({ email, id }) => { + ch1.sendToQueue( + queueName, + Buffer.from( + JSON.stringify({ + input: { + to_email: email, + }, + webhook: { + url: `${getWebappURL()}/api/v1/bulk/webhook`, + extra: { + bulkEmailId: id, + userId: user.id, + endpoint: "/v1/bulk", + }, + }, + }) + ), + { + contentType: "application/json", + priority: 1, + } + ); + }); + + await ch1.close(); + await conn.close(); + + return Response.json({ message: "Hello world!", res: res1 }); + } catch (err) { + if (isEarlyResponse(err)) { + return err.response; + } + + sentryException(err as Error); + return Response.json( + { + error: (err as Error).message, + }, + { + status: 500, + } + ); + } +}; + +async function getUser(req: NextRequest): Promise> { + const token = req.headers.get("Authorization"); + + if (typeof token !== "string") { + throw new Error("Expected API token in the Authorization header."); + } + + const { data, error } = await supabaseAdmin + .from>("users") + .select("*") + .eq("api_token", token); + if (error) { + throw error; + } + if (!data?.length) { + throw { + response: newEarlyResponse( + Response.json( + { error: "Invalid API token." }, + { + status: 401, + } + ) + ), + }; + } + + return data[0]; +} + +type EarlyResponse = { + response: Response; +}; + +function newEarlyResponse(response: Response): EarlyResponse { + return { response }; +} + +function isEarlyResponse(err: unknown): err is EarlyResponse { + return (err as EarlyResponse).response !== undefined; +} diff --git a/src/app/api/v1/bulk/webhook/route.ts b/src/app/api/v1/bulk/webhook/route.ts new file mode 100644 index 00000000..c90a4812 --- /dev/null +++ b/src/app/api/v1/bulk/webhook/route.ts @@ -0,0 +1,57 @@ +import { CheckEmailOutput } from "@reacherhq/api"; +import { supabaseAdmin } from "@/util/supabaseServer"; +import { NextRequest } from "next/server"; +import { removeSensitiveData } from "@/util/api"; +import { Tables } from "@/supabase/database.types"; + +export interface WebhookExtra { + bulkEmailId: string; + userId: string; + endpoint: string; +} + +export interface WebhookPayload { + output: CheckEmailOutput; + extra: WebhookExtra; +} + +export const POST = async (req: NextRequest): Promise => { + if (req.headers.get("x-reacher-secret") !== process.env.RCH_HEADER_SECRET) { + return Response.json({ error: "Invalid header secret" }); + } + + const body: WebhookPayload = await req.json(); + const { output, extra } = body; + + // Add to supabase calls + const res1 = await supabaseAdmin + .from>("calls") + .insert({ + endpoint: extra.endpoint, + user_id: extra.userId, + backend: output.debug?.server_name, + domain: output.syntax.domain, + duration: Math.round( + (output.debug?.duration.secs || 0) * 1000 + + (output.debug?.duration.nanos || 0) / 1000000 + ), + is_reachable: output.is_reachable, + verif_method: output.debug?.smtp?.verif_method?.type, + result: removeSensitiveData(output), + }) + .select("*"); + if (res1.error) { + return Response.json(res1.error, res1); + } + + // Update bulk_emails table + const res2 = await supabaseAdmin + .from("bulk_emails") + .update({ call_id: res1.data[0].id }) + .eq("id", extra.bulkEmailId); + if (res2.error) { + return Response.json(res2.error, res2); + } + + return Response.json({ message: "ok" }, { status: 200 }); +}; diff --git a/src/pages/bulk.tsx b/src/pages/bulk.tsx new file mode 100644 index 00000000..0160ba67 --- /dev/null +++ b/src/pages/bulk.tsx @@ -0,0 +1,145 @@ +import { Button, Page, Spacer, Table, Text, Textarea } from "@geist-ui/react"; +import { CheckEmailOutput } from "@reacherhq/api/lib"; +import React, { useEffect, useState } from "react"; + +import { postData } from "@/util/helpers"; +import { sentryException } from "@/util/sentry"; +import { useUser } from "@/util/useUser"; +import { Tables } from "@/supabase/database.types"; +import { supabase } from "@/util/supabaseClient"; +import { Nav } from "@/components"; + +function alertError(email: string, e: string) { + alert( + `An unexpected error happened. Can you email amaury@reacher.email with this message (or a screenshot)? + +Email: ${email} +Error: ${e}` + ); +} + +interface BulkProps { + onVerified?(result: CheckEmailOutput): Promise; +} + +interface BulkJobWithEmails extends Tables<"bulk_jobs"> { + bulk_emails: Tables<"bulk_emails">[]; +} + +export default function Bulk({ onVerified }: BulkProps): React.ReactElement { + const { user, userDetails } = useUser(); + const [emails, setEmails] = useState(""); + const [loading, setLoading] = useState(false); + + const [bulkJobs, setBulkJobs] = useState([]); + + useEffect(() => { + // This is a temporary redirect to the dashboard while I still work + // on the bulk page. + if (window.location.hostname == "app.reacher.email") { + window.location.href = "https://app.reacher.email/dashboard"; + } + }, []); + + useEffect(() => { + setInterval(async () => { + console.log("FETCHING BULK JOBS..."); + const res = await supabase + .from("bulk_jobs") + .select(`*,bulk_emails(*)`); + if (res.error) { + sentryException(res.error); + return; + } + + setBulkJobs(res.data); + }, 3000); + }, []); + + function handleVerify() { + if (!emails) { + return; + } + + if (!userDetails) { + alertError( + "n/a", + `userDetails is undefined for user ${user?.id || "undefined"}` + ); + return; + } + + setLoading(true); + console.log("[/dashboard] Verifying email", emails); + postData({ + url: `/api/v1/bulk`, + token: userDetails?.api_token, + data: { + input_type: "array", + input: emails.split("\n"), + }, + }) + .then((r) => { + setLoading(false); + return onVerified && onVerified(r); + }) + .catch((err: Error) => { + sentryException(err); + alertError(emails, err.message); + setLoading(false); + }); + } + + return ( + <> +