Skip to content

Commit

Permalink
Merge pull request #571 from betagouv/add-import-script
Browse files Browse the repository at this point in the history
feat: add import script
  • Loading branch information
rdubigny authored May 17, 2024
2 parents 22c76d5 + 4e01673 commit 3222377
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 260 deletions.
56 changes: 25 additions & 31 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"console-log-level": "^1.4.1",
"cookie-parser": "^1.4.6",
"csrf-sync": "^4.0.3",
"csv": "^5.5.3",
"dotenv": "^16.4.5",
"ejs": "^3.1.10",
"express": "^4.19.2",
Expand Down Expand Up @@ -98,6 +97,7 @@
"chai-as-promised": "^7.1.1",
"concurrently": "^8.2.2",
"copy-and-watch": "^0.1.6",
"csv": "^6.3.9",
"cypress": "^13.8.1",
"cypress-axe": "^1.5.0",
"cypress-mailslurp": "^1.9.0",
Expand Down
257 changes: 257 additions & 0 deletions scripts/import-accounts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
// src https://stackoverflow.com/questions/40994095/pipe-streams-to-edit-csv-file-in-node-js
import fs from "fs";
import { isEmpty, some, toInteger } from "lodash-es";
import {
getInseeAccessToken,
getOrganizationInfo,
} from "../src/connectors/api-sirene";
import { AxiosError } from "axios";
import { create, findByEmail, update } from "../src/repositories/user";
import { logger } from "../src/services/log";
import {
getNumberOfLineInFile,
humanReadableDuration,
isOrganizationInfo,
startDurationMesure,
throttleApiCall,
} from "../src/services/script-helpers";
import {
isEmailValid,
isNameValid,
isSiretValid,
} from "../src/services/security";
import {
linkUserToOrganization,
updateUserOrganizationLink,
upsert,
} from "../src/repositories/organization/setters";

import { parse, stringify, transform } from "csv";
import { findByUserId } from "../src/repositories/organization/getters";

const INPUT_FILE = process.env.INPUT_FILE ?? "./input.csv";
const OUTPUT_FILE = process.env.OUTPUT_FILE ?? "./output.csv";

// ex: for public insee subscription the script can be run like so:
// npm run update-organization-info 2000
const rateInMsFromArgs = toInteger(process.argv[2]);
// we wait 125ms max which allow us to make 8 requests to insee api per seconds
// this makes 480 request per minute which is just under our 500 requests per minute quota
// for 30 000 organizations, this script will run for about 1 hour
const maxInseeCallRateInMs = rateInMsFromArgs !== 0 ? rateInMsFromArgs : 125;

(async () => {
const access_token = await getInseeAccessToken();

const readStream = fs.createReadStream(INPUT_FILE); // readStream is a read-only stream wit raw text content of the CSV file
const writeStream = fs.createWriteStream(OUTPUT_FILE); // writeStream is a write-only stream to write on the disk

const inputCsvStream = parse({
columns: true,
trim: true,
cast: false,
delimiter: ",",
}); // csv Stream is a read and write stream : it reads raw text in CSV and output untransformed records
const outputCsvStream = stringify({
quoted_empty: false,
quoted_string: false,
header: true,
});

type InputCsvData = {
last_name: string;
first_name: string;
email: string;
sub: string;
siret: string;
};
type OutputCsvData = {
email: string;
inclusionconnect_sub: string;
moncomptepro_sub: string;
};
const input_file_lines = await getNumberOfLineInFile(INPUT_FILE);
let i = 1;
let rejected_invalid_email_address_count = 0;
let rejected_invalid_siret_count = 0;
let rejected_invalid_names_count = 0;
let rejected_invalid_sub_count = 0;
let unexpected_error_count = 0;
let success_count = 0;

// 50ms is an estimated additional delay from insee API
const estimatedExecutionTimeInMilliseconds =
(maxInseeCallRateInMs + 50) * input_file_lines;

logger.info("");
logger.info(
"\x1b[33m",
`Estimated execution time is ${humanReadableDuration(
estimatedExecutionTimeInMilliseconds,
)}`,
"\x1b[0m",
);
logger.info("");

const transformStream = transform(
async function (
data: InputCsvData,
done: (err: null | Error, data?: OutputCsvData) => void,
) {
const start = startDurationMesure();
try {
const { first_name, last_name, sub, email, siret: rawSirets } = data;
logger.info(`${i}: processing ${email}...`);
// 0. params validation
if (!isEmailValid(email)) {
i++;
rejected_invalid_email_address_count++;
logger.error(`invalid email address ${email}`);
return done(null);
}
if (!isNameValid(first_name) || !isNameValid(last_name)) {
i++;
rejected_invalid_names_count++;
logger.error(`invalid name ${first_name} ${last_name}`);
return done(null);
}
if (isEmpty(sub) && sub.length === 36) {
i++;
rejected_invalid_sub_count++;
logger.error(`invalid sub ${sub}`);
return done(null);
}

// 1. add user if it does not exist
let user = await findByEmail(email);
if (isEmpty(user)) {
user = await create({ email });
await update(user.id, {
given_name: first_name,
family_name: last_name,
needs_inclusionconnect_welcome_page: true,
needs_inclusionconnect_onboarding_help: true,
});
}

const sirets: string[] = rawSirets
.split(",")
.map((s: string) => s.trim());
if (sirets.length > 0 && sirets.some((s) => !isSiretValid(s))) {
i++;
rejected_invalid_siret_count++;
logger.error(`invalid siret ${rawSirets}`);
return done(null);
}

for (let siret of sirets) {
// 2. get organizationInfo
try {
const organizationInfo = await getOrganizationInfo(
siret,
access_token,
);
if (!isOrganizationInfo(organizationInfo)) {
throw Error("empty organization info");
}
if (!organizationInfo.estActive) {
throw Error("organization not active");
}

// 3. update organizationInfo
const organization = await upsert({
siret: organizationInfo.siret,
organizationInfo,
});

// 4. create the user-organization link
const usersOrganizations = await findByUserId(user.id);
if (!some(usersOrganizations, ["id", organization.id])) {
await linkUserToOrganization({
organization_id: organization.id,
user_id: user.id,
verification_type: "imported_from_inclusion_connect",
});
await updateUserOrganizationLink(organization.id, user.id, {
authentication_by_peers_type: "deactivated_by_import",
});
}
} catch (error) {
logger.error(`unexpected error for siret: ${siret}`);
logger.error(error);
}
await throttleApiCall(start, maxInseeCallRateInMs);
}
i++;
success_count++;
return done(null, {
email,
inclusionconnect_sub: sub,
moncomptepro_sub: user.id,
});
} catch (error) {
logger.error("unexpected error");
logger.error(
"\x1b[31m",
error instanceof AxiosError && !isEmpty(error.response)
? error.response.data
: error,
"\x1b[0m",
);
logger.error("");

await throttleApiCall(start, maxInseeCallRateInMs);
i++;
unexpected_error_count++;
return done(null);
}
},
{ parallel: 1 }, // avoid messing with line orders
).on("end", () => {
logger.info(`Import done! Import logs are recorded in ${OUTPUT_FILE}.`);
logger.info("");
logger.info(
"success_count: \x1b[32m",
success_count,
"\x1b[0m",
);
logger.info(
"rejected_invalid_email_address_count: \x1b[33m",
rejected_invalid_email_address_count,
"\x1b[0m",
);
logger.info(
"rejected_invalid_siret_count: \x1b[33m",
rejected_invalid_siret_count,
"\x1b[0m",
);
logger.info(
"rejected_invalid_names_count: \x1b[33m",
rejected_invalid_names_count,
"\x1b[0m",
);
logger.info(
"rejected_invalid_sub_count: \x1b[33m",
rejected_invalid_sub_count,
"\x1b[0m",
);
logger.info(
"unexpected_error_count: \x1b[31m",
unexpected_error_count,
"\x1b[0m",
);
logger.info(
"total: \x1b[1m",
i - 1,
"\x1b[21m",
);
});

logger.info(`Importing data from ${INPUT_FILE}`);

readStream
.pipe(inputCsvStream)
.pipe(transformStream)
.pipe(outputCsvStream)
.pipe(writeStream);
})();
Loading

0 comments on commit 3222377

Please sign in to comment.