-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.ts
133 lines (105 loc) · 5.06 KB
/
main.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import { connectNats, jwtDecode, jwtValidate, MySqlClient, JSONCodec, connectRedis } from "./deps.ts";
import type { MySqlConnection } from "./deps.ts";
import * as env from "./env.ts";
import type { MetaSpacePosts, MicroserviceMessage, NewPostInfo, ApiWrapper, MatatakiPostInfo } from "./types.ts";
const natsClient = await connectNats({
servers: env.NATS_SERVER,
});
const redis = await connectRedis({
hostname: env.REDIS_HOST,
port: env.REDIS_PORT,
password: env.REDIS_PASSWORD,
});
const natsCodec = JSONCodec<MicroserviceMessage<number>>();
const userIdSubscription = natsClient.subscribe("cms.post.sync.matataki");
console.log('monitoring started');
for await (const message of userIdSubscription) {
console.log(`message:`,message);
try {
const { data: userId } = natsCodec.decode(message.data);
let client: MySqlClient | undefined;
try {
client = await new MySqlClient().connect({
hostname: env.DB_HOST,
username: env.DB_USER,
password: env.DB_PASSWORD,
db: env.DB_DATABASE,
});
await client.transaction((connection: MySqlConnection) => doJob(connection, userId));
} catch (e) {
await redis.set(`cms:post:sync_state:matataki:${userId}`, "error");
throw e;
} finally {
if (client)
await client.close();
}
} catch (e) {
console.error(new Date(), "Unhandled exception:", e);
}
}
type AccessTokenRecord = { userId: number, accessToken: string }
async function doJob(connection: MySqlConnection, userId: number) {
console.log(new Date(), `Job started (userId: ${userId})`);
const [{ accessToken }] = await connection.query("SELECT accessToken FROM access_token_entity WHERE userId = ? AND platform = 'matataki' AND active = 1;", [userId]) as Array<AccessTokenRecord>;
const { payload } = jwtValidate(jwtDecode(accessToken));
const matatakiId = payload.id as number;
const newPosts = await getNewPostsOfUser(connection, userId, matatakiId);
await saveNewPosts(connection, userId, newPosts);
console.log(new Date(), "Job completed");
}
async function getNewPostsOfUser(connection: MySqlConnection, ucenterId: number, matatakiId: number) {
try {
let latestTimestamp = await getLatestTimestamp(connection, matatakiId);
const { posts, latestMetadata } = await getMetaSpacePosts(matatakiId);
const newPosts = new Array<NewPostInfo>();
for (const [postId, { createdAt, metadataHash }] of Object.entries(latestMetadata)) {
const timestamp = new Date(createdAt);
if (timestamp <= latestTimestamp)
continue;
const postInfo = posts.find(post => post.id === Number(postId))!;
const tags = await getTags(Number(postId));
newPosts.push({
ucenterId,
id: Number(postId),
hash: metadataHash,
timestamp: createdAt,
title: postInfo.title,
cover: postInfo.cover,
tags,
});
if (latestTimestamp < timestamp)
latestTimestamp = timestamp;
}
await recordLatestTime(connection, matatakiId, latestTimestamp);
return newPosts;
} catch (error) {
console.error(new Date(), `Unexpected error to ucenter id ${ucenterId}:`, error);
throw error;
}
}
async function getLatestTimestamp(connection: MySqlConnection, matatakiId: number) {
const rows = await connection.query("SELECT latestTime FROM matataki_sync_entity WHERE userId = ?;", [matatakiId]) as Array<{ latestTime: Date }>;
return rows[0]?.latestTime ?? new Date(0);
}
async function getMetaSpacePosts(userId: number) {
const response = await fetch(`${env.MATATAKI_API_PREFIX}/migration/meta-space/posts?uid=${userId}`);
return await response.json() as MetaSpacePosts;
}
async function saveNewPosts(connection: MySqlConnection, userId: number, newPosts: Array<NewPostInfo>) {
for (const newPost of newPosts) {
const cover = newPost.cover ? `${env.MATATAKI_COVER_PREFIX}${newPost.cover}` : "";
await connection.query("INSERT INTO post_entity(`userId`, `title`, `cover`, `platform`, `source`, `state`, `tags`, `createdAt`) VALUES(?, ?, ?, ?, ?, ?, ?, ?);", [
newPost.ucenterId, newPost.title, cover, "matataki", newPost.hash, "pending", newPost.tags.join(","), new Date(newPost.timestamp)
]);
}
await redis.set(`cms:post:sync_state:matataki:${userId}`, newPosts.length);
console.log(new Date(), `Saved ${newPosts.length} posts`);
}
async function recordLatestTime(connection: MySqlConnection, matatakiId: number, timestamp: Date) {
await connection.query("REPLACE INTO matataki_sync_entity VALUES(?, ?);", [matatakiId, timestamp]);
}
async function getTags(postId: number) {
const response = await fetch(`${env.MATATAKI_API_PREFIX}/p/${postId}`);
const { data } = await response.json() as ApiWrapper<MatatakiPostInfo>;
return data.tags.map(({ name }) => name);
}