Skip to content

Commit

Permalink
fix: duplicate unique key error and start with later week
Browse files Browse the repository at this point in the history
Signed-off-by: Manuel Ruck <[email protected]>
  • Loading branch information
Manuel Ruck committed Nov 30, 2024
1 parent 58b69c6 commit 7c2804a
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 112 deletions.
14 changes: 14 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,26 @@
"cwd": "${workspaceFolder}/services/cron-jobs/crawler",
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
},
{
"name": "Run import-conference-week-details dev",
"type": "node",
"request": "launch",
"runtimeExecutable": "pnpm",
"runtimeArgs": ["dev"],
"cwd": "${workspaceFolder}/services/cron-jobs/import-conference-week-details",
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
}
],
"compounds": [
{
"name": "Debug crawler dev",
"configurations": ["Run crawler dev", "Attach to Process"]
},
{
"name": "Debug import-conference-week-details dev",
"configurations": ["Run import-conference-week-details dev", "Attach to Process"]
}
]
}
4 changes: 4 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions services/cron-jobs/import-conference-week-details/.env.local
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DB_URL=mongodb://localhost/bundestagio
CONFERENCE_WEEK_DETAIL_YEAR=2021
CONFERENCE_WEEK_DETAIL_WEEK=45
DEBUG=*
CONFERENCE_WEEK_DETAIL_YEAR=2023
CONFERENCE_WEEK_DETAIL_WEEK=25
DEBUG=
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"main": "build/index.js",
"license": "Apache-2.0",
"scripts": {
"dev": "dotenv -e .env.local -- tsup src/index.ts --watch --onSuccess 'node build/index.js'",
"dev": "tsx --env-file .env --env-file .env.local --watch src/index.ts",
"build": "tsup-node",
"lint": "pnpm lint:ts && pnpm lint:exports",
"lint:ts": "tsc --noEmit",
Expand All @@ -26,6 +26,7 @@
"tsconfig": "workspace:*",
"tsup": "catalog:",
"tsup-config": "workspace:*",
"tsx": "^4.11.0",
"typescript": "^5.4.5"
}
}
240 changes: 132 additions & 108 deletions services/cron-jobs/import-conference-week-details/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,133 +102,157 @@ const getProcedureIds = async (documents: any) => {
return procedures.map((p) => p.procedureId);
};

const updateConferenceWeekDetail = async (dataPackage: any, voteDates: any[], lastProcedureIds: any[]) => {
console.debug(dataPackage);
const ConferenceWeekDetail = {
URL: dataPackage.meta.url,
id: dataPackage.data.id,
previousYear: dataPackage.data.previous.year,
previousWeek: dataPackage.data.previous.week,
thisYear: dataPackage.data.this.year ?? dataPackage.meta.currentYear,
thisWeek: dataPackage.data.this.week ?? dataPackage.meta.currentWeek,
nextYear: dataPackage.data.next.year,
nextWeek: dataPackage.data.next.week,
sessions: await dataPackage.data.sessions.reduce(async (pSession: any, session: any) => {
const resultSession = await pSession;
resultSession.push({
...session,
tops: await session.tops.reduce(async (pTop: any, top: any) => {
// Await for last result
const resultTop = await pTop;
// Write VoteEnd Date
lastProcedureIds.forEach((procedureId) => {
if (voteDates[procedureId].voteDate && voteDates[procedureId].voteDate <= top.time) {
voteDates[procedureId].voteEnd = top.time;
}
});
lastProcedureIds = [];
// Append current result
resultTop.push({
...top,
topic: await Promise.all(
top.topic.map(async (topic: any) => {
// eslint-disable-next-line no-param-reassign
topic.isVote = isVote(topic.lines.join(' '), top.heading, topic.documents, top.status);
topic.procedureIds = await getProcedureIds(topic.documents); // eslint-disable-line no-param-reassign
// Save VoteDates to update them at the end when the correct values are present
topic.procedureIds.forEach((procedureId: any) => {
// Override voteDate only if there is none set or we would override it by a new date
if (!voteDates[procedureId] || !voteDates[procedureId].voteDate || topic.isVote === true) {
voteDates[procedureId] = {
procedureId,
voteDate: topic.isVote ? top.time : null,
voteEnd: null,
documents: topic.documents,
};
}
});
// Remember last procedureIds to save voteEnd Date
lastProcedureIds = lastProcedureIds.concat(topic.procedureIds);
return topic;
}),
),
});
return resultTop;
}, []),
});
return resultSession;
}, []),
};
// Update/Insert with unique index handling
await ConferenceWeekDetailModel.updateOne(
{ id: ConferenceWeekDetail.id },
{ $set: ConferenceWeekDetail },
{ upsert: true },
).catch((error) => {
if (error.code === 11000) {
console.warn('Duplicate key error, updating existing document');
ConferenceWeekDetailModel.updateOne(
{ nextYear: ConferenceWeekDetail.nextYear, nextWeek: ConferenceWeekDetail.nextWeek },
{ $set: ConferenceWeekDetail },
).catch(console.error);
} else {
console.error('Error while updating ConferenceWeekDetail');
console.debug('Error details: ', error);
}
});
};

const updateProcedureVoteDates = async (voteDates: any[]) => {
await Promise.all(
voteDates.map(async (procedureUpdate) => {
await ProcedureModel.updateOne(
{
procedureId: procedureUpdate.procedureId,
// Update only when needed
$or: [
{
$and: [
{ voteDate: { $ne: procedureUpdate.voteDate } },
// Make sure we do not override date from procedureScraper
{ voteDate: { $lt: procedureUpdate.voteDate } },
],
},
{ voteEnd: { $ne: procedureUpdate.voteEnd } },
],
},
{
$set: {
voteDate: procedureUpdate.voteDate,
voteEnd: procedureUpdate.voteEnd,
},
},
);
}),
);
};

const start = async () => {
const startDate = new Date();
const cron = await getCron({ name: CRON_NAME });
let lastData: ConferenceWeeCronJobkData | undefined;
await setCronStart({ name: CRON_NAME, startDate });

try {
const startData =
cron.data?.lastYear && cron.lastSuccessStartDate?.getDay() === new Date().getDay()
? {
year: cron.data.lastYear,
week: cron.data.lastWeek,
}
: {
year: process.env.CONFERENCE_WEEK_DETAIL_YEAR ? Number(process.env.CONFERENCE_WEEK_DETAIL_YEAR) : 2022,
week: process.env.CONFERENCE_WEEK_DETAIL_WEEK ? Number(process.env.CONFERENCE_WEEK_DETAIL_WEEK) : 2,
};
const startData = getStartData(cron);
let voteDates: any[] = [];
let lastProcedureIds: any[] = [];
const lastProcedureIds: any[] = [];

await Scraper.scrape(new ConferenceWeekDetailScraper(startData), async (dataPackage: any) => {
// Construct Database object

lastData = {
lastYear: dataPackage.data.previous.year,
lastWeek: dataPackage.data.previous.week,
};

const ConferenceWeekDetail = {
URL: dataPackage.meta.url,
id: dataPackage.data.id,
previousYear: dataPackage.data.previous.year,
previousWeek: dataPackage.data.previous.week,
thisYear: dataPackage.data.this.year ?? dataPackage.meta.currentYear,
thisWeek: dataPackage.data.this.week ?? dataPackage.meta.currentWeek,
nextYear: dataPackage.data.next.year,
nextWeek: dataPackage.data.next.week,
sessions: await dataPackage.data.sessions.reduce(async (pSession: any, session: any) => {
const resultSession = await pSession;
resultSession.push({
...session,
tops: await session.tops.reduce(async (pTop: any, top: any) => {
// Await for last result
const resultTop = await pTop;
// Write VoteEnd Date
lastProcedureIds.forEach((procedureId) => {
if (voteDates[procedureId].voteDate && voteDates[procedureId].voteDate <= top.time) {
voteDates[procedureId].voteEnd = top.time;
}
});
lastProcedureIds = [];
// Append current result
resultTop.push({
...top,
topic: await Promise.all(
top.topic.map(async (topic: any) => {
// eslint-disable-next-line no-param-reassign
topic.isVote = isVote(topic.lines.join(' '), top.heading, topic.documents, top.status);
topic.procedureIds = await getProcedureIds(topic.documents); // eslint-disable-line no-param-reassign
// Save VoteDates to update them at the end when the correct values are present
topic.procedureIds.forEach((procedureId: any) => {
// Override voteDate only if there is none set or we would override it by a new date
if (!voteDates[procedureId] || !voteDates[procedureId].voteDate || topic.isVote === true) {
voteDates[procedureId] = {
procedureId,
voteDate: topic.isVote ? top.time : null,
voteEnd: null,
documents: topic.documents,
};
}
});
// Remember last procedureIds to save voteEnd Date
lastProcedureIds = lastProcedureIds.concat(topic.procedureIds);
return topic;
}),
),
});
return resultTop;
}, []),
});
return resultSession;
}, []),
};
// Update/Insert
await ConferenceWeekDetailModel.updateOne(
{ id: ConferenceWeekDetail.id },
{ $set: ConferenceWeekDetail },
{ upsert: true },
).catch(console.error);
await updateConferenceWeekDetail(dataPackage, voteDates, lastProcedureIds);
});

voteDates = voteDates.filter((voteDate) => !!voteDate);
// Update Procedure VoteDates
await Promise.all(
voteDates.map(async (procedureUpdate) => {
await ProcedureModel.updateOne(
{
procedureId: procedureUpdate.procedureId,
// Update only when needed
$or: [
{
$and: [
{ voteDate: { $ne: procedureUpdate.voteDate } },
// Make sure we do not override date from procedureScraper
{ voteDate: { $lt: procedureUpdate.voteDate } },
],
},
{ voteEnd: { $ne: procedureUpdate.voteEnd } },
],
},
{
$set: {
voteDate: procedureUpdate.voteDate,
voteEnd: procedureUpdate.voteEnd,
},
},
);
}),
);
await updateProcedureVoteDates(voteDates);

await setCronSuccess({
name: CRON_NAME,
successStartDate: startDate,
data: lastData,
});
} catch (error) {
await setCronError({ name: CRON_NAME, error: JSON.stringify(error) });
throw error;

console.error('ERROR');
console.debug('Error details: ', error);
// throw error;
}
await setCronSuccess({
name: CRON_NAME,
successStartDate: startDate,
data: lastData,
});
};

const getStartData = (cron: any) => {
return cron.data?.lastYear && cron.lastSuccessStartDate?.getDay() === new Date().getDay()
? {
year: cron.data.lastYear,
week: cron.data.lastWeek,
}
: {
year: process.env.CONFERENCE_WEEK_DETAIL_YEAR ? Number(process.env.CONFERENCE_WEEK_DETAIL_YEAR) : 2023,
week: process.env.CONFERENCE_WEEK_DETAIL_WEEK ? Number(process.env.CONFERENCE_WEEK_DETAIL_WEEK) : 25,
};
};

(async () => {
Expand Down

0 comments on commit 7c2804a

Please sign in to comment.