-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhttp_srv.mjs
151 lines (123 loc) · 4.39 KB
/
http_srv.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
This module has tools relevant for HTTP servers, using only standard web APIs
which are environment-independent and should work in service workers, Deno, and
with polyfills also in Node. For tools relevant for HTTP clients, see
`http.mjs`. For Deno-specific tools, see `http_deno.mjs`.
*/
import * as l from './lang.mjs'
import * as h from './http.mjs'
/*
Orkaround for the insane DOM stream API which seems to
provide NO WAY to make a reader-writer pair.
*/
export class WritableReadableStream extends ReadableStream {
constructor(sig) {
let ctr
super({start: val => {ctr = val}})
this.ctr = reqStreamController(ctr)
this.sig = l.optInst(sig, AbortSignal)
this.throwIfAborted()
sig?.addEventListener(`abort`, this, {once: true})
}
/*
Would prefer `this.sig?.throwIfAborted()`, but at the time of writing,
it has very little browser support.
*/
throwIfAborted() {if (this.sig?.aborted) throw new this.AbortError()}
handleEvent(event) {
if (event.type === `abort`) {
event.target.removeEventListener(event.type, this)
this.error(new this.AbortError())
this.deinit()
}
}
write(val) {return this.ctr.enqueue(val)}
error(val) {return this.ctr.error(val)}
/*
Note: `.cancel` is intended for readers and is not allowed to close a locked
stream; `.close` is intended for writers and does close a locked stream used
by a reader. Controller `.close` is synchronous and void. WHATWG streams have
non-idempotent close, throwing on repeated calls. We suppress their
exceptions because we prefer idempotent, repeatable close.
*/
close() {try {this.ctr.close()} catch {}}
deinit() {return this.close()}
get AbortError() {return h.AbortError}
}
/*
Short for "broadcaster". Maintains a set of writable/readable streams and allows
broadcasting to them. Uses the standard stream APIs, which are compatible with
Deno and browsers.
*/
export class Broad extends Set {
/*
The registered stream is not removed immediately when its signal is aborted,
only when writing to it after abort. We may consider fixing this later, but
it involves more code and more states which are tricky to get right. At the
time of writing, Deno doesn't immediately abort request signals nor does it
immediately invoke stream cancel, so it wouldn't even work for Deno servers.
Relevant Deno issues:
https://github.com/denoland/deno/issues/10829
https://github.com/denoland/deno/issues/10854
*/
make(sig) {
const cli = new this.Stream(sig)
this.add(cli)
return cli
}
write(val) {
val = this.toBytes(val)
const buf = []
for (const cli of this) buf.push(this.writeTo(cli, val))
return Promise.all(buf)
}
writeJson(val) {return this.write(h.jsonEncode(val))}
/*
Writes text in the "event stream" message format.
The input should be a single line without newlines.
References:
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
https://developer.mozilla.org/en-US/docs/Web/API/EventSource
*/
writeEvent(val) {return this.write(eventSourceLine(val))}
writeEventJson(val) {return this.writeEvent(h.jsonEncode(val))}
async writeTo(cli, val) {
try {
await cli.write(val)
}
catch (err) {
this.delete(cli)
cli.deinit()
this.onWriteErr(err)
}
}
onWriteErr(err) {
if (h.isErrAbort(err) || isStreamWriteErr(err)) return
throw err
}
/*
Even though the stream API supports strings, encoding to bytes seems to be
required in Deno. Unclear if required in other environments. Using strings
produces cryptic and unhelpful errors.
Perf note: `new TextEncoder` is not nearly as expensive as actual encoding.
Unclear if we should cache an instance.
*/
toBytes(val) {
if (l.isInst(val, Uint8Array)) return val
if (l.isStr(val)) return new TextEncoder().encode(val)
throw l.errConv(val, `bytes`)
}
deinit() {
for (const cli of this) {
this.delete(cli)
cli.deinit()
}
}
get Stream() {return WritableReadableStream}
}
function isStreamController(val) {return l.hasMeth(val, `enqueue`) && l.hasMeth(val, `close`)}
function reqStreamController(val) {return l.req(val, isStreamController)}
function eventSourceLine(val) {return `data: ` + l.reqStr(val) + `\n\n`}
export function isStreamWriteErr(val) {
return l.isInst(val, TypeError) && val.message.includes(`stream controller cannot close or enqueue`)
}