Skip to content

Commit 698e2d7

Browse files
committed
Handle errors during streaming (#281)
1 parent 0a7d3d7 commit 698e2d7

File tree

1 file changed

+85
-59
lines changed

1 file changed

+85
-59
lines changed

src/buffering.ml

Lines changed: 85 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ struct
88
Empty
99
| Writing
1010
| Full
11+
| Error of exn
1112

1213
let state_to_string = function
1314
| Empty -> "Empty"
1415
| Writing -> "Writing"
1516
| Full -> "Full"
17+
| Error e -> Printf.sprintf "Error(%s)" (Printexc.to_string e)
1618

1719
type t = {
1820
buffer : BufferPool.Buffer.t;
@@ -37,10 +39,15 @@ struct
3739
}
3840

3941
let blit_to_arr dest_arr offset block =
40-
if block.state = Empty then
41-
invalid_arg "blit_to_arr (empty block)";
42-
if block.state = Writing then
43-
invalid_arg "blit_to_arr (partially written block)";
42+
begin match block.state with
43+
| Empty
44+
| Writing
45+
| Error _ ->
46+
invalid_arg
47+
(Printf.sprintf
48+
"blit_to_arr (block state=%s)" (state_to_string block.state));
49+
| Full -> ()
50+
end;
4451
let dest_len = Bigarray.Array1.dim dest_arr in
4552
let src_off = Int64.to_int (Int64.sub offset block.start_pos) in
4653
let src_len = block.size - src_off in
@@ -225,69 +232,87 @@ struct
225232
(block, s)
226233
in
227234
let wait_for_full_block block =
228-
Utils.log_with_header
229-
"Waiting for streaming completion (buffer id=%d, state=%s)\n%!"
230-
block.Block.buffer.BufferPool.Buffer.id
231-
(Block.state_to_string block.Block.state);
232235
while block.Block.state = Block.Writing do
233-
Condition.wait
234-
block.Block.buffer.BufferPool.Buffer.condition
235-
block.Block.buffer.BufferPool.Buffer.mutex;
236236
Utils.log_with_header
237-
"Streaming completed (buffer id=%d, state=%s)\n%!"
237+
"Waiting for streaming completion (buffer id=%d, state=%s)\n%!"
238238
block.Block.buffer.BufferPool.Buffer.id
239239
(Block.state_to_string block.Block.state);
240-
done
240+
Condition.wait
241+
block.Block.buffer.BufferPool.Buffer.condition
242+
block.Block.buffer.BufferPool.Buffer.mutex;
243+
done;
244+
begin match block.Block.state with
245+
| Block.Error e ->
246+
Utils.log_with_header
247+
"Streaming error (buffer id=%d, state=%s)\n%!"
248+
block.Block.buffer.BufferPool.Buffer.id
249+
(Block.state_to_string block.Block.state);
250+
raise e
251+
| _ ->
252+
Utils.log_with_header
253+
"Streaming completed (buffer id=%d, state=%s)\n%!"
254+
block.Block.buffer.BufferPool.Buffer.id
255+
(Block.state_to_string block.Block.state);
256+
end
241257
in
242258
let fill_and_blit block_index src_offset dest_arr =
243259
Utils.with_lock_m buffers.mutex
244260
(get_block_m block_index >>= fun block ->
245-
begin match block.Block.state with
246-
| Block.Empty -> begin
247-
Mutex.unlock buffers.mutex;
248-
(* Switch from global lock to block lock to allow concurrent
249-
* streaming. *)
250-
Utils.with_lock_m block.Block.buffer.BufferPool.Buffer.mutex
251-
(SessionM.return () >>= fun () ->
252-
begin match block.Block.state with
253-
| Block.Empty -> begin
254-
block.Block.state <- Block.Writing;
255-
Utils.try_with_m
256-
(fill_array
257-
block.Block.start_pos
258-
block.Block.sub_array)
259-
(fun e ->
260-
remove_partial_block
261-
(remote_id, block_index) block buffers;
262-
raise e) >>= fun () ->
263-
block.Block.state <- Block.Full;
264-
Utils.log_with_header
265-
"Broadcasting streaming completion \
266-
(buffer id=%d, state=%s)\n%!"
267-
block.Block.buffer.BufferPool.Buffer.id
268-
(Block.state_to_string block.Block.state);
269-
Condition.broadcast
270-
block.Block.buffer.BufferPool.Buffer.condition;
271-
SessionM.return block
272-
end
273-
| Block.Full ->
274-
SessionM.return block
275-
| Block.Writing -> begin
276-
wait_for_full_block block;
277-
SessionM.return block
278-
end
279-
end
280-
) >>= fun block ->
281-
Mutex.lock buffers.mutex;
282-
SessionM.return block
283-
end
284-
| Block.Full -> SessionM.return block
285-
| Block.Writing -> begin
286-
Utils.with_lock block.Block.buffer.BufferPool.Buffer.mutex
287-
(fun () -> wait_for_full_block block);
288-
SessionM.return block
289-
end
290-
end >>= fun block ->
261+
SessionM.return (block, block.Block.state)) >>= fun (block, state) ->
262+
begin match state with
263+
| Block.Empty
264+
| Block.Error _ -> begin
265+
(* Switch from global lock to block lock to allow concurrent
266+
* streaming. *)
267+
Utils.with_lock_m block.Block.buffer.BufferPool.Buffer.mutex
268+
(SessionM.return () >>= fun () ->
269+
begin match block.Block.state with
270+
| Block.Empty
271+
| Block.Error _ -> begin
272+
block.Block.state <- Block.Writing;
273+
Utils.try_with_m
274+
(fill_array
275+
block.Block.start_pos
276+
block.Block.sub_array)
277+
(fun e ->
278+
remove_partial_block
279+
(remote_id, block_index) block buffers;
280+
block.Block.state <- Block.Error e;
281+
Utils.log_with_header
282+
"Broadcasting streaming error \
283+
(buffer id=%d, state=%s)\n%!"
284+
block.Block.buffer.BufferPool.Buffer.id
285+
(Block.state_to_string block.Block.state);
286+
Condition.broadcast
287+
block.Block.buffer.BufferPool.Buffer.condition;
288+
raise e) >>= fun () ->
289+
block.Block.state <- Block.Full;
290+
Utils.log_with_header
291+
"Broadcasting streaming completion \
292+
(buffer id=%d, state=%s)\n%!"
293+
block.Block.buffer.BufferPool.Buffer.id
294+
(Block.state_to_string block.Block.state);
295+
Condition.broadcast
296+
block.Block.buffer.BufferPool.Buffer.condition;
297+
SessionM.return block
298+
end
299+
| Block.Full ->
300+
SessionM.return block
301+
| Block.Writing -> begin
302+
wait_for_full_block block;
303+
SessionM.return block
304+
end
305+
end)
306+
end
307+
| Block.Full -> SessionM.return block
308+
| Block.Writing -> begin
309+
Utils.with_lock block.Block.buffer.BufferPool.Buffer.mutex
310+
(fun () -> wait_for_full_block block);
311+
SessionM.return block
312+
end
313+
end >>= fun block ->
314+
Utils.with_lock_m buffers.mutex
315+
(SessionM.return () >>= fun () ->
291316
block.Block.last_access <- Unix.gettimeofday ();
292317
Utils.with_lock block.Block.buffer.BufferPool.Buffer.mutex
293318
(fun () ->
@@ -430,6 +455,7 @@ struct
430455
let evict_cache buffers =
431456
try
432457
while true do
458+
Utils.log_with_header "evict_cache loop\n%!";
433459
Utils.with_lock buffers.mutex
434460
(fun () -> release_lru_buffer_if_request_blocked buffers);
435461
Utils.with_lock buffers.mutex

0 commit comments

Comments
 (0)