forked from hplush/slowreader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrefresh.ts
140 lines (126 loc) · 3.93 KB
/
refresh.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import { atom, computed, map } from 'nanostores'
import { createDownloadTask, type DownloadTask } from './download.ts'
import {
changeFeed,
type FeedValue,
getFeed,
getFeedLatestPosts,
loadFeeds
} from './feed.ts'
import { type FilterChecker, loadFilters, prepareFilters } from './filter.ts'
import { createQueue, type Queue, retryOnError } from './lib/queue.ts'
import { increaseKey, readonlyExport } from './lib/stores.ts'
import { addPost, type OriginPost, processOriginPost } from './post.ts'
let $isRefreshing = atom(false)
export const isRefreshing = readonlyExport($isRefreshing)
export const DEFAULT_REFRESH_STATISTICS = {
errors: 0,
foundFast: 0,
foundSlow: 0,
initializing: false,
missedFeeds: 0,
processedFeeds: 0,
totalFeeds: 0
}
export type RefreshStatistics = typeof DEFAULT_REFRESH_STATISTICS
let $stats = map({ ...DEFAULT_REFRESH_STATISTICS })
export const refreshStatistics = readonlyExport($stats)
export const refreshProgress = computed($stats, stats => {
if (stats.initializing || stats.totalFeeds === 0) {
return 0
} else {
return Math.floor((stats.processedFeeds / stats.totalFeeds) * 100) / 100
}
})
let task: DownloadTask
let queue: Queue<{ feed: FeedValue }>
function wasAlreadyAdded(feed: FeedValue, origin: OriginPost): boolean {
if (origin.publishedAt && feed.lastPublishedAt) {
return origin.publishedAt <= feed.lastPublishedAt
} else {
return origin.originId === feed.lastOriginId
}
}
export async function refreshPosts(): Promise<void> {
if ($isRefreshing.get()) return
$isRefreshing.set(true)
$stats.set({ ...DEFAULT_REFRESH_STATISTICS, initializing: true })
task = createDownloadTask()
let feeds = await loadFeeds()
$stats.set({
...$stats.get(),
initializing: false,
totalFeeds: feeds.length
})
queue = createQueue(feeds.map(feed => ({ payload: feed, type: 'feed' })))
await queue.start(4, {
async feed(feed) {
let feedStore = getFeed(feed.id)
let pages = getFeedLatestPosts(feed, task)
let filters: FilterChecker | undefined
let firstNew: OriginPost | undefined
async function end(): Promise<void> {
if (firstNew && !feedStore.deleted) {
await changeFeed(feed.id, {
lastOriginId: firstNew.originId,
lastPublishedAt: firstNew.publishedAt
})
}
increaseKey($stats, 'processedFeeds')
}
while (pages.get().hasNext) {
let posts = await retryOnError(
() => pages.next(),
() => {
increaseKey($stats, 'errors')
}
)
if (posts === 'error') {
increaseKey($stats, 'missedFeeds')
await end()
return
} else if (posts === 'abort') {
await end()
return
} else {
if (posts[0]) {
if (posts[0].publishedAt) {
posts = posts.sort((a, b) => {
return (b.publishedAt ?? 0) - (a.publishedAt ?? 0)
})
}
if (!firstNew && !wasAlreadyAdded(feed, posts[0]!)) {
firstNew = posts[0]
}
}
if (!filters) {
filters = prepareFilters(await loadFilters({ feedId: feed.id }))
}
for (let origin of posts) {
if (feedStore.deleted || wasAlreadyAdded(feed, origin)) {
await end()
return
}
let reading = filters(origin) ?? feed.reading
if (reading !== 'delete') {
await addPost(processOriginPost(origin, feed.id, reading))
if (reading === 'fast') {
increaseKey($stats, 'foundFast')
} else {
increaseKey($stats, 'foundSlow')
}
}
}
}
}
await end()
}
})
$isRefreshing.set(false)
}
export function stopRefreshing(): void {
if (!$isRefreshing.get()) return
$isRefreshing.set(false)
queue.stop()
task.abortAll()
}