-
Notifications
You must be signed in to change notification settings - Fork 1
/
xs.nu
173 lines (146 loc) · 5.01 KB
/
xs.nu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
export alias "h. get" = h. request get
export alias "h. post" = h. request post
alias ? = if ($in | is-not-empty) { $in }
alias ?? = ? else { return }
def and-then [ next: closure --else: closure ] {
if ($in | is-not-empty) { do $next } else {
do $else
}
}
export def store-addr [] {
$env | default "./store" XSPWD | get XSPWD
}
# update to use (store-addr) and the xs cli
def _cat [options: record] {
let params = [
(if ($options | get follow? | default false) { "--follow" })
(if ($options | get tail? | default false) { "--tail" })
(if $options.last_id? != null { ["--last-id" $options.last_id] })
(if $options.limit? != null { ["--limit" $options.limit] })
(if $options.pulse? != null { ["--pulse" $options.pulse] })
] | compact | flatten
xs cat (store-addr) ...$params | lines | each { |x| $x | from json }
}
export def .cat [
--follow (-f) # long poll for new events
--pulse (-p): int # specifies the interval (in milliseconds) to receive a synthetic "xs.pulse" event
--tail (-t) # begin long after the end of the stream
--last-id (-l): string
--limit: int
] {
_cat {follow: $follow pulse: $pulse tail: $tail last_id: $last_id limit: $limit}
}
def read_hash [hash?: any] {
match ($hash | describe -d | get type) {
"string" => $hash
"record" => ($hash | get hash?)
_ => null
}
}
# .step: Process events from a topic, using a subtopic as a cursor for the last processed event.
#
# Parameters:
# handler: closure - Processes each event. Return null to continue, non-null to stop.
# proto_topic: string - Main topic to process events from. Cursor stored in "{proto_topic}.last-id".
# --follow (-f) - Optional. Long poll for new events.
#
# Behavior:
# - Processes events from proto_topic, using "{proto_topic}.last-id" as a cursor.
# - Calls handler for each event, skips events matching the cursor subtopic.
# - Updates cursor after processing each event.
# - Continues until handler returns non-null or no more events (unless --follow is set).
#
# Returns:
# - {in: $frame, out: $res} if handler returns non-null, where $frame is the input event
# and $res is the handler's output. Returns null if all events processed or none available.
#
# Note: something doesn't feel right about using the stream to track last
# processed id, which requires us to skip our own emissions
export def .step [
handler: closure
proto_topic: string
--follow (-f) # long poll for new events
] {
let topic = $"($proto_topic).last-id"
mut prev = .head $topic | and-then { {
last_id: ($in.meta.last_id)
} } --else { {} }
loop {
let frame = _cat ($prev | insert follow $follow | insert limit 1) | try { first }
if $frame == null { return }
$prev.last_id = $frame.id
if $frame.topic == $topic {
continue
}
let res = $frame | do $handler
if $res != null {
return {in: $frame out: $res}
}
.append $topic --meta {last_id: $prev.last_id}
}
}
export def .cas [hash?: any] {
let alt = $in
let hash = read_hash (if $hash != null { $hash } else { $alt })
if $hash == null { return }
xs cas (store-addr) $hash
}
export def .get [id: string] {
xs get (store-addr) $id | from json
}
export def .head [topic: string] {
xs head (store-addr) $topic | from json
}
# Append an event to the stream
export def .append [
topic: string # The topic to append the event to
--meta: record # Optional metadata to include with the event
--ttl: string # Optional Time-To-Live for the event. Can be a duration in milliseconds, "forever", "temporary", or "ephemeral"
] {
xs append (store-addr) $topic ...([
(if $meta != null { ["--meta" ($meta | to json -r)] })
(if $ttl != null { ["--ttl" $ttl] })
] | compact | flatten) | from json
}
export def .remove [id: string] {
xs remove (store-addr) $id
}
export alias .rm = .remove
export def .pipe [id: string] {
let sp = (metadata $in).span
let script = $in
let content = match ($script | describe -d | get type) {
"string" => $script
"closure" => {view source $script}
_ => {return (error make {
msg: "script should either be a string or closure"
label: {
text: "script input"
span: $sp
}
})}
}
$content | xs pipe (store-addr) $id
}
# show the status of running tasks TBD
export def .tasks [] {
.cat
}
export def .test [] {
use std assert;
let cases = [
[
"sha256-k//MXqRXKqeE+7S7SkKSbpU3dWrxwzh/iR6v683XTyE="
"sha256-k//MXqRXKqeE+7S7SkKSbpU3dWrxwzh/iR6v683XTyE="
]
[
{hash: "sha256-k//MXqRXKqeE+7S7SkKSbpU3dWrxwzh/iR6v683XTyE="}
"sha256-k//MXqRXKqeE+7S7SkKSbpU3dWrxwzh/iR6v683XTyE="
]
[ null null ]
[ {goo: 123} null ]
]
for case in $cases {
assert equal (read_hash $case.0) $case.1
}
}