-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.ml
249 lines (226 loc) · 7.41 KB
/
pool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
(*
Generic pool of reusable connections.
See mli file.
*)
open Printf
open Lwt
module type Connection = sig
type conn
val create_connection : unit -> conn Lwt.t
val close_connection : conn -> unit Lwt.t
val is_reusable : conn -> bool Lwt.t
end
module type S = sig
type connection_pool
type conn
val create_pool : capacity:int -> max_live_conn:int -> connection_pool
val with_connection : connection_pool -> (conn -> 'a Lwt.t) -> 'a Lwt.t
end
module Make (C : Connection) = struct
type conn = C.conn
type connection_pool = {
(* Queue of reusable connections *)
queue : (conn * bool ref) Queue.t;
(* the boolean indicates whether the connection is live,
consistently with the live connection counter
(and not necessarily with the actual state of the connection
because of possibility of exceptions and double-closing). *)
queue_capacity : int;
(* System for limiting the number of simultaneous connections *)
max_live_conn : int;
(* maximum number of live (= open) connections *)
live_conn : int ref;
(* counter of live connections *)
conn_possible : unit Lwt_condition.t;
(* used to wake up a thread when a new connection can be obtained
either by reusing one from the queue or by creating a new one. *)
}
let create_pool ~capacity ~max_live_conn =
if capacity <= 0 then
invalid_arg "Pool.Make().create_pool: capacity must be positive"
else if capacity > max_live_conn then
invalid_arg "Pool.Make().create_pool: \
pool capacity may not be greater than the maximum number \
of simultaneous connections"
else {
queue = Queue.create ();
queue_capacity = capacity;
max_live_conn;
live_conn = ref 0;
conn_possible = Lwt_condition.create ();
}
let create_connection p =
incr p.live_conn;
assert (!(p.live_conn) <= p.max_live_conn);
(catch
C.create_connection
(fun e ->
decr p.live_conn;
assert (!(p.live_conn) >= 0);
Trax.raise __LOC__ e
)
) >>= fun conn ->
return (conn, ref true)
let rec get_connection p =
try return (Queue.take p.queue)
with Queue.Empty ->
if !(p.live_conn) >= p.max_live_conn then
Lwt_condition.wait p.conn_possible >>= fun () ->
get_connection p
else
create_connection p
let close_connection p (connection, is_live) =
Lwt.finalize
(fun () -> C.close_connection connection)
(fun () ->
if !is_live then (
(* avoid double counting *)
decr p.live_conn;
is_live := false;
);
assert (!(p.live_conn) >= 0);
if !(p.live_conn) < p.max_live_conn then (
(* Wake up one of the waiting threads *)
Lwt_condition.signal p.conn_possible ()
);
return ()
)
let recycle_connection p connection =
Queue.add connection p.queue;
Lwt_condition.signal p.conn_possible ()
let with_connection p f =
get_connection p >>= fun ((conn, is_live) as connection) ->
let save_or_close_connection () =
C.is_reusable conn >>= function
| false -> close_connection p connection
| true ->
if Queue.length p.queue < p.queue_capacity then (
recycle_connection p connection;
return ()
)
else
close_connection p connection
in
catch
(fun () ->
f conn >>= fun result ->
save_or_close_connection () >>= fun () ->
return result
)
(fun e ->
save_or_close_connection () >>= fun () ->
Trax.raise __LOC__ e
)
end
module Simple = Make (struct
type conn = unit
let create_connection () = return ()
let close_connection () = return ()
let is_reusable () = return true
end)
let create_throttler n =
let pool = Simple.create_pool ~capacity:n ~max_live_conn:n in
fun f -> Simple.with_connection pool f
module Test = struct
let test_connection_pool () =
let module Connection =
struct
type conn = int ref * bool ref
let create_connection () = return (ref 0, ref true)
let close_connection _ = return ()
let is_reusable (_, reusable) = return !reusable
end
in
let module Connection_pool = Make(Connection) in
let pool = Connection_pool.create_pool ~capacity:3 ~max_live_conn:5 in
let t =
let use_connection () =
Connection_pool.with_connection pool (fun (count, _) ->
incr count;
Lwt_unix.sleep 0.25 >>= fun () ->
return count
)
in
let ignore_count = use_connection () >>= fun _ -> return () in
join [ignore_count; ignore_count; ignore_count] >>= fun () ->
(* That should hit the connection capacity, so now we're reusing a
connection *)
use_connection () >>= fun count ->
assert (!count = 2);
let break_connection () =
Connection_pool.with_connection pool (fun (_, reusable) ->
reusable := false;
Lwt_unix.sleep 0.25 >>= fun () ->
return ()
)
in
join [break_connection (); break_connection (); break_connection ()]
>>= fun () ->
(* All those non-reusable connections should be cleared out, so if we
use one now, it'll be a new connection *)
use_connection () >>= fun count ->
return (!count = 1)
in
Lwt_main.run t
(* Goals of this test:
- make sure it terminates
- make sure a close_connection that raises an exception doesn't
bring the connection counter to a negative value
*)
let test_broken_connection_pools () =
let module Connection =
struct
type conn = unit
let create_connection () = Lwt_unix.sleep 0.01
let close_connection () = Lwt_unix.sleep 0.01 >>= fun () -> raise Exit
let is_reusable () = Lwt_unix.sleep 0.01 >>= fun () -> return true
end
in
let module Connection_pool = Make(Connection) in
let pool = Connection_pool.create_pool ~capacity:1 ~max_live_conn:3 in
let job =
(* In debug mode (Log.level := `Debug), this prints something like
+++--++--++--++--++----
*)
Lwt_list.iter_p (fun () ->
catch
(fun () ->
Connection_pool.with_connection pool (fun () ->
(*if !Log.level = `Debug then printf "+%!";*)
Lwt_unix.sleep 0.05 >>= fun () ->
(*if !Log.level = `Debug then printf "-%!";*)
return ()
)
)
(fun e ->
match Trax.unwrap e with
| Exit -> return ()
| e -> Trax.raise __LOC__ e
)
) [ (); (); (); (); (); (); (); (); (); (); (); ]
in
Lwt_main.run job;
(* Check that we don't hang when max_live_conn = capacity *)
let pool2 = Connection_pool.create_pool ~capacity:1 ~max_live_conn:1 in
let job2 =
Lwt_list.iter_p (fun () ->
catch
(fun () ->
Connection_pool.with_connection pool2 (fun () ->
Lwt_unix.sleep 0.05
)
)
(fun e ->
match Trax.unwrap e with
| Exit -> return ()
| _ -> Trax.raise __LOC__ e)
) [ (); (); (); (); ]
in
Lwt_main.run job2;
true
let tests = [
"test_connection_pool", test_connection_pool;
"test_broken_connection_pools", test_broken_connection_pools;
]
end
let tests = Test.tests