Skip to content

Commit

Permalink
feat: Add bulk page (Staging only) (#474)
Browse files Browse the repository at this point in the history
* Use generated types

* WIP BULK

* Fix timezone

* use smtp

* feat: Add bulk page (Staging only)
  • Loading branch information
amaury1093 authored Dec 16, 2023
1 parent fca04a0 commit f4da24f
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 1 deletion.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"axios": "^1.6.2",
"cors": "^2.8.5",
"date-fns": "^2.30.0",
"encoding": "^0.1.13",
"mailgun-js": "^0.22.0",
"markdown-pdf": "^11.0.0",
"mustache": "^4.2.0",
Expand Down
160 changes: 160 additions & 0 deletions src/app/api/v1/bulk/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import { NextRequest } from "next/server";
import amqplib from "amqplib";
import { supabaseAdmin } from "@/util/supabaseServer";
import { sentryException } from "@/util/sentry";
import { getWebappURL } from "@/util/helpers";
import { Tables } from "@/supabase/database.types";

interface BulkPayload {
input_type: "array";
input: string[];
}

export const POST = async (req: NextRequest): Promise<Response> => {
// TODO Remove this once we allow Bulk.
if (process.env.VERCEL_ENV === "production") {
return Response.json(
{ error: "Not available in production" },
{ status: 403 }
);
}

try {
const user = await getUser(req);

const payload: BulkPayload = await req.json();

// Add to Supabase
const res1 = await supabaseAdmin
.from("bulk_jobs")
.insert({
user_id: user.id,
payload,
})
.select("*");
if (res1.error) {
throw res1.error;
}
const bulkJob = res1.data[0];
const res2 = await supabaseAdmin
.from("bulk_emails")
.insert(
payload.input.map((email) => ({
bulk_job_id: bulkJob.id,
email,
}))
)
.select("*");
if (res2.error) {
throw res2.error;
}

const conn = await amqplib
.connect(process.env.RCH_AMQP_ADDR || "amqp://localhost")
.catch((err) => {
const message = `Error connecting to RabbitMQ: ${
(err as AggregateError).errors
? (err as AggregateError).errors
.map((e) => e.message)
.join(", ")
: err.message
}`;

throw new Error(message);
});

const ch1 = await conn.createChannel().catch((err) => {
throw new Error(`Error creating RabbitMQ channel: ${err.message}`);
});
const queueName = `check_email.Smtp`; // TODO
await ch1.assertQueue(queueName, {
maxPriority: 5,
});

res2.data.forEach(({ email, id }) => {
ch1.sendToQueue(
queueName,
Buffer.from(
JSON.stringify({
input: {
to_email: email,
},
webhook: {
url: `${getWebappURL()}/api/v1/bulk/webhook`,
extra: {
bulkEmailId: id,
userId: user.id,
endpoint: "/v1/bulk",
},
},
})
),
{
contentType: "application/json",
priority: 1,
}
);
});

await ch1.close();
await conn.close();

return Response.json({ message: "Hello world!", res: res1 });
} catch (err) {
if (isEarlyResponse(err)) {
return err.response;
}

sentryException(err as Error);
return Response.json(
{
error: (err as Error).message,
},
{
status: 500,
}
);
}
};

async function getUser(req: NextRequest): Promise<Tables<"users">> {
const token = req.headers.get("Authorization");

if (typeof token !== "string") {
throw new Error("Expected API token in the Authorization header.");
}

const { data, error } = await supabaseAdmin
.from<Tables<"users">>("users")
.select("*")
.eq("api_token", token);
if (error) {
throw error;
}
if (!data?.length) {
throw {
response: newEarlyResponse(
Response.json(
{ error: "Invalid API token." },
{
status: 401,
}
)
),
};
}

return data[0];
}

type EarlyResponse = {
response: Response;
};

function newEarlyResponse(response: Response): EarlyResponse {
return { response };
}

function isEarlyResponse(err: unknown): err is EarlyResponse {
return (err as EarlyResponse).response !== undefined;
}
57 changes: 57 additions & 0 deletions src/app/api/v1/bulk/webhook/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { CheckEmailOutput } from "@reacherhq/api";
import { supabaseAdmin } from "@/util/supabaseServer";
import { NextRequest } from "next/server";
import { removeSensitiveData } from "@/util/api";
import { Tables } from "@/supabase/database.types";

export interface WebhookExtra {
bulkEmailId: string;
userId: string;
endpoint: string;
}

export interface WebhookPayload {
output: CheckEmailOutput;
extra: WebhookExtra;
}

export const POST = async (req: NextRequest): Promise<Response> => {
if (req.headers.get("x-reacher-secret") !== process.env.RCH_HEADER_SECRET) {
return Response.json({ error: "Invalid header secret" });
}

const body: WebhookPayload = await req.json();
const { output, extra } = body;

// Add to supabase calls
const res1 = await supabaseAdmin
.from<Tables<"calls">>("calls")
.insert({
endpoint: extra.endpoint,
user_id: extra.userId,
backend: output.debug?.server_name,
domain: output.syntax.domain,
duration: Math.round(
(output.debug?.duration.secs || 0) * 1000 +
(output.debug?.duration.nanos || 0) / 1000000
),
is_reachable: output.is_reachable,
verif_method: output.debug?.smtp?.verif_method?.type,
result: removeSensitiveData(output),
})
.select("*");
if (res1.error) {
return Response.json(res1.error, res1);
}

// Update bulk_emails table
const res2 = await supabaseAdmin
.from("bulk_emails")
.update({ call_id: res1.data[0].id })
.eq("id", extra.bulkEmailId);
if (res2.error) {
return Response.json(res2.error, res2);
}

return Response.json({ message: "ok" }, { status: 200 });
};
145 changes: 145 additions & 0 deletions src/pages/bulk.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import { Button, Page, Spacer, Table, Text, Textarea } from "@geist-ui/react";
import { CheckEmailOutput } from "@reacherhq/api/lib";
import React, { useEffect, useState } from "react";

import { postData } from "@/util/helpers";
import { sentryException } from "@/util/sentry";
import { useUser } from "@/util/useUser";
import { Tables } from "@/supabase/database.types";
import { supabase } from "@/util/supabaseClient";
import { Nav } from "@/components";

function alertError(email: string, e: string) {
alert(
`An unexpected error happened. Can you email [email protected] with this message (or a screenshot)?
Email: ${email}
Error: ${e}`
);
}

interface BulkProps {
onVerified?(result: CheckEmailOutput): Promise<void>;
}

interface BulkJobWithEmails extends Tables<"bulk_jobs"> {
bulk_emails: Tables<"bulk_emails">[];
}

export default function Bulk({ onVerified }: BulkProps): React.ReactElement {
const { user, userDetails } = useUser();
const [emails, setEmails] = useState("");
const [loading, setLoading] = useState(false);

const [bulkJobs, setBulkJobs] = useState<BulkJobWithEmails[]>([]);

useEffect(() => {
// This is a temporary redirect to the dashboard while I still work
// on the bulk page.
if (window.location.hostname == "app.reacher.email") {
window.location.href = "https://app.reacher.email/dashboard";
}
}, []);

useEffect(() => {
setInterval(async () => {
console.log("FETCHING BULK JOBS...");
const res = await supabase
.from<BulkJobWithEmails>("bulk_jobs")
.select(`*,bulk_emails(*)`);
if (res.error) {
sentryException(res.error);
return;
}

setBulkJobs(res.data);
}, 3000);
}, []);

function handleVerify() {
if (!emails) {
return;
}

if (!userDetails) {
alertError(
"n/a",
`userDetails is undefined for user ${user?.id || "undefined"}`
);
return;
}

setLoading(true);
console.log("[/dashboard] Verifying email", emails);
postData<CheckEmailOutput>({
url: `/api/v1/bulk`,
token: userDetails?.api_token,
data: {
input_type: "array",
input: emails.split("\n"),
},
})
.then((r) => {
setLoading(false);
return onVerified && onVerified(r);
})
.catch((err: Error) => {
sentryException(err);
alertError(emails, err.message);
setLoading(false);
});
}

return (
<>
<Nav />
<Page>
<Text h3>BULK Work in Progress Page</Text>

<div className="text-center">
<Textarea
autoFocus
disabled={loading}
onChange={(e) => {
setEmails(e.target.value);
}}
placeholder="[email protected]"
value={emails}
></Textarea>

<Spacer />

<Button
disabled={loading}
loading={loading}
onClick={handleVerify}
type="success"
>
Bulk Verify
</Button>
</div>

<Spacer />

<Table data={bulkJobs}>
<Table.Column prop="id" label="ID" />
</Table>

<div>
ALLJOBS:
{bulkJobs.map((job) => (
<div key={job.id}>
{job.id} -{" "}
{
job.bulk_emails.filter(
({ call_id }) => !!call_id
).length
}
/{job.bulk_emails.length}
</div>
))}
</div>
</Page>
</>
);
}
Loading

0 comments on commit f4da24f

Please sign in to comment.