Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CP-50475: parallelize device ops during VM lifecycle ops #6057

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 124 additions & 112 deletions ocaml/xenopsd/lib/xenops_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ type atomic =
| VM_rename of (Vm.id * Vm.id * rename_when)
| VM_import_metadata of (Vm.id * Metadata.t)
| Parallel of Vm.id * string * atomic list
| Serial of Vm.id * string * atomic list
| Best_effort of atomic
[@@deriving rpcty]

Expand Down Expand Up @@ -271,6 +272,9 @@ let rec name_of_atomic = function
| Parallel (_, _, atomics) ->
Printf.sprintf "Parallel (%s)"
(String.concat " | " (List.map name_of_atomic atomics))
| Serial (_, _, atomics) ->
Printf.sprintf "Serial (%s)"
(String.concat " & " (List.map name_of_atomic atomics))
| Best_effort atomic ->
Printf.sprintf "Best_effort (%s)" (name_of_atomic atomic)

Expand Down Expand Up @@ -1550,6 +1554,23 @@ let dequarantine_ops vgpus =
fun vgpu -> PCI_dequarantine vgpu.physical_pci_address
)

(* Avoid generating list-based atoms with 1 or no actions in them *)
let collect_into apply = function [] -> [] | [op] -> [op] | lst -> apply lst

let parallel name ~id =
collect_into (fun ls -> [Parallel (id, Printf.sprintf "%s VM=%s" name id, ls)])

let serial name ~id =
collect_into (fun ls -> [Serial (id, Printf.sprintf "%s VM=%s" name id, ls)])

let parallel_concat name ~id lst = parallel name ~id (List.concat lst)

let serial_concat name ~id lst = serial name ~id (List.concat lst)

let parallel_map name ~id lst f = parallel name ~id (List.concat_map f lst)

let map_or_empty f x = Option.value ~default:[] (Option.map f x)

let rec atomics_of_operation = function
| VM_start (id, force) ->
let vbds_rw, vbds_ro = VBD_DB.vbds id |> vbd_plug_sets in
Expand All @@ -1561,6 +1582,23 @@ let rec atomics_of_operation = function
List.partition (is_nvidia_sriov vgpus) pcis
in
let no_sharept = List.exists is_no_sharept vgpus in
let plug_vbds typ vbds =
let pf = Printf.sprintf in
let name_multi = pf "VBDs.activate_epoch_and_plug %s" typ in
let name_one = pf "VBD.activate_epoch_and_plug %s" typ in
parallel_map name_multi ~id vbds (fun vbd ->
serial_concat name_one ~id
[
[VBD_set_active (vbd.Vbd.id, true)]
; map_or_empty
(fun x ->
[VBD_epoch_begin (vbd.Vbd.id, x, vbd.Vbd.persistent)]
)
vbd.Vbd.backend
; [VBD_plug vbd.Vbd.id]
]
)
in
[
dequarantine_ops vgpus
; [
Expand All @@ -1569,50 +1607,35 @@ let rec atomics_of_operation = function
; VM_create (id, None, None, no_sharept)
; VM_build (id, force)
]
; List.map
(fun vbd -> VBD_set_active (vbd.Vbd.id, true))
(vbds_rw @ vbds_ro)
(* keeping behaviour of vbd_plug_order: rw vbds must be plugged before
ro vbds, see vbd_plug_sets *)
; List.map
(fun (ty, vbds) ->
Parallel
( id
, Printf.sprintf "VBD.epoch_begin %s vm=%s" ty id
, List.filter_map
(fun vbd ->
Option.map
(fun x ->
VBD_epoch_begin (vbd.Vbd.id, x, vbd.Vbd.persistent)
)
vbd.Vbd.backend
)
vbds
; parallel_concat "Devices.plug (no qemu)" ~id
[
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
serial_concat "VBDs.acticate_epoch_and_plug RW+RO" ~id
[plug_vbds "RW" vbds_rw; plug_vbds "RO" vbds_ro]
; List.concat_map
(fun vif ->
serial "VIF.activate_and_plug" ~id
[VIF_set_active (vif.Vif.id, true); VIF_plug vif.Vif.id]
)
)
[("RW", vbds_rw); ("RO", vbds_ro)]
; [
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
Parallel
( id
, Printf.sprintf "VBD.plug RW vm=%s" id
, List.map (fun vbd -> VBD_plug vbd.Vbd.id) vbds_rw
)
; Parallel
( id
, Printf.sprintf "VBD.plug RO vm=%s" id
, List.map (fun vbd -> VBD_plug vbd.Vbd.id) vbds_ro
)
]
; List.map (fun vif -> VIF_set_active (vif.Vif.id, true)) vifs
; List.map (fun vif -> VIF_plug vif.Vif.id) vifs
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, true)) vgpus
; List.map (fun pci -> PCI_plug (pci.Pci.id, false)) pcis_sriov
vifs
; serial_concat "VGPUs.activate & PCI.plug (SRIOV)" ~id
[
parallel_map "VGPUs.activate" ~id vgpus (fun vgpu ->
[VGPU_set_active (vgpu.Vgpu.id, true)]
)
; parallel_map "PCIs.plug (SRIOV)" ~id pcis_sriov (fun pci ->
[PCI_plug (pci.Pci.id, false)]
)
]
]
; [VM_create_device_model (id, false)]
(* PCI and USB devices are hot-plugged into HVM guests via QEMU, so the
following operations occur after creating the device models *)
; List.map (fun pci -> PCI_plug (pci.Pci.id, true)) pcis_other
; List.map (fun vusb -> VUSB_plug vusb.Vusb.id) vusbs
; parallel_concat "Devices.plug (qemu)" ~id
[
List.map (fun pci -> PCI_plug (pci.Pci.id, true)) pcis_other
; List.map (fun vusb -> VUSB_plug vusb.Vusb.id) vusbs
]
(* At this point the domain is considered survivable. *)
; [VM_set_domain_action_request (id, None)]
]
Expand All @@ -1623,65 +1646,62 @@ let rec atomics_of_operation = function
let pcis = PCI_DB.pcis id in
let vusbs = VUSB_DB.vusbs id in
[
Option.value ~default:[]
(Option.map (fun x -> [VM_shutdown_domain (id, PowerOff, x)]) timeout)
map_or_empty (fun x -> [VM_shutdown_domain (id, PowerOff, x)]) timeout
(* Before shutting down a VM, we need to unplug its VUSBs. *)
; List.map (fun vusb -> VUSB_unplug vusb.Vusb.id) vusbs
; parallel_map "VUSBs.unplug" ~id vusbs (fun vusb ->
[VUSB_unplug vusb.Vusb.id]
)
; [
(* CA-315450: in a hard shutdown or snapshot revert, timeout=None and
VM_shutdown_domain is not called. To avoid any interference, we
pause the domain before destroying the device model. *)
Best_effort (VM_pause id)
; VM_destroy_device_model id
; Parallel
( id
, Printf.sprintf "VBD.unplug vm=%s" id
, List.map (fun vbd -> VBD_unplug (vbd.Vbd.id, true)) vbds
)
]
; List.map (fun vif -> VIF_unplug (vif.Vif.id, true)) vifs
; List.map (fun pci -> PCI_unplug pci.Pci.id) pcis
; parallel_concat "Devices.unplug" ~id
[
List.map (fun vbd -> VBD_unplug (vbd.Vbd.id, true)) vbds
; List.map (fun vif -> VIF_unplug (vif.Vif.id, true)) vifs
; List.map (fun pci -> PCI_unplug pci.Pci.id) pcis
]
; [VM_destroy id]
]
|> List.concat
| VM_restore_vifs id ->
let vifs = VIF_DB.vifs id in
[
List.map (fun vif -> VIF_set_active (vif.Vif.id, true)) vifs
; List.map (fun vif -> VIF_plug vif.Vif.id) vifs
]
|> List.concat
parallel_map "VIFs.activate_and_plug" ~id vifs (fun vif ->
serial "VIF.activate_and_plug" ~id
[VIF_set_active (vif.Vif.id, true); VIF_plug vif.Vif.id]
)
| VM_restore_devices (id, restore_vifs) ->
let vbds_rw, vbds_ro = VBD_DB.vbds id |> vbd_plug_sets in
let vgpus = VGPU_DB.vgpus id in
let pcis = PCI_DB.pcis id |> pci_plug_order in
let pcis_other = List.filter (is_not_nvidia_sriov vgpus) pcis in
let plug_vbds typ vbds =
let pf = Printf.sprintf in
let name_multi = pf "VBDs.activate_and_plug %s" typ in
let name_one = pf "VBD.activate_and_plug %s" typ in
parallel_map name_multi ~id vbds (fun vbd ->
serial name_one ~id
[VBD_set_active (vbd.Vbd.id, true); VBD_plug vbd.Vbd.id]
)
in
[
List.map
(fun vbd -> VBD_set_active (vbd.Vbd.id, true))
(vbds_rw @ vbds_ro)
; [
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
Parallel
( id
, Printf.sprintf "VBD.plug RW vm=%s" id
, List.map (fun vbd -> VBD_plug vbd.Vbd.id) vbds_rw
)
; Parallel
( id
, Printf.sprintf "VBD.plug RO vm=%s" id
, List.map (fun vbd -> VBD_plug vbd.Vbd.id) vbds_ro
)
]
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
plug_vbds "RW" vbds_rw
; plug_vbds "RO" vbds_ro
; (if restore_vifs then atomics_of_operation (VM_restore_vifs id) else [])
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, true)) vgpus
(* Nvidia SRIOV PCI devices have been already been plugged *)
; [
VM_create_device_model (id, true)
(* PCI and USB devices are hot-plugged into HVM guests via QEMU, so
the following operations occur after creating the device models *)
]
; List.map (fun pci -> PCI_plug (pci.Pci.id, true)) pcis_other
; (* Nvidia SRIOV PCI devices have been already been plugged *)
parallel_map "VGPUs.activate" ~id vgpus (fun vgpu ->
[VGPU_set_active (vgpu.Vgpu.id, true)]
)
; [VM_create_device_model (id, true)]
(* PCI and USB devices are hot-plugged into HVM guests via QEMU, so
the following operations occur after creating the device models *)
; parallel_map "PCIs.plug" ~id pcis_other (fun pci ->
[PCI_plug (pci.Pci.id, true)]
)
]
|> List.concat
| VM_poweroff (id, timeout) ->
Expand All @@ -1694,25 +1714,24 @@ let rec atomics_of_operation = function
else
Xenops_hooks.reason__clean_shutdown
in
let unplug_vbd vbd =
serial_concat "VBD.epoch_and_deactivate" ~id
[
map_or_empty
(fun x -> [VBD_epoch_end (vbd.Vbd.id, x)])
vbd.Vbd.backend
; [VBD_set_active (vbd.Vbd.id, false)]
]
in
[
[VM_hook_script (id, Xenops_hooks.VM_pre_destroy, reason)]
; atomics_of_operation (VM_shutdown (id, timeout))
; [
Parallel
( id
, Printf.sprintf "VBD.epoch_end vm=%s" id
, List.filter_map
(fun vbd ->
Option.map
(fun x -> VBD_epoch_end (vbd.Vbd.id, x))
vbd.Vbd.backend
)
vbds
)
]
; List.map (fun vbd -> VBD_set_active (vbd.Vbd.id, false)) vbds
; List.map (fun vif -> VIF_set_active (vif.Vif.id, false)) vifs
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, false)) vgpus
; parallel_concat "Devices.deactivate" ~id
[
List.concat_map unplug_vbd vbds
; List.map (fun vif -> VIF_set_active (vif.Vif.id, false)) vifs
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, false)) vgpus
]
; [VM_hook_script (id, Xenops_hooks.VM_post_destroy, reason)]
]
|> List.concat
Expand All @@ -1725,23 +1744,14 @@ let rec atomics_of_operation = function
Xenops_hooks.reason__clean_reboot
in
[
Option.value ~default:[]
(Option.map (fun x -> [VM_shutdown_domain (id, Reboot, x)]) timeout)
map_or_empty (fun x -> [VM_shutdown_domain (id, Reboot, x)]) timeout
; [VM_hook_script (id, Xenops_hooks.VM_pre_destroy, reason)]
; atomics_of_operation (VM_shutdown (id, None))
; [
Parallel
( id
, Printf.sprintf "VBD.epoch_end vm=%s" id
, List.filter_map
(fun vbd ->
Option.map
(fun x -> VBD_epoch_end (vbd.Vbd.id, x))
vbd.Vbd.backend
)
vbds
)
]
; parallel_map "VBD.epoch_end" ~id vbds (fun vbd ->
map_or_empty
(fun x -> [VBD_epoch_end (vbd.Vbd.id, x)])
vbd.Vbd.backend
)
; [
VM_hook_script (id, Xenops_hooks.VM_post_destroy, reason)
; VM_hook_script
Expand Down Expand Up @@ -1858,7 +1868,7 @@ let rec perform_atomic ~progress_callback ?subtask:_ ?result (op : atomic)
(Xenops_task.id_of_handle t)
(List.length atoms) description
in
let with_tracing = parallel_id_with_tracing parallel_id t in
let with_tracing = id_with_tracing parallel_id t in
debug "begin_%s" parallel_id ;
let task_list =
queue_atomics_and_wait ~progress_callback ~max_parallel_atoms:10
Expand Down Expand Up @@ -1902,6 +1912,8 @@ let rec perform_atomic ~progress_callback ?subtask:_ ?result (op : atomic)
List.iter
(fun err -> match err with None -> () | Some e -> raise e)
errors
| Serial (_, _, atoms) ->
List.iter (Fun.flip (perform_atomic ~progress_callback) t) atoms
| VIF_plug id ->
debug "VIF.plug %s" (VIF_DB.string_of_id id) ;
B.VIF.plug t (VIF_DB.vm_of id) (VIF_DB.read_exn id) ;
Expand Down Expand Up @@ -2501,7 +2513,7 @@ and trigger_cleanup_after_failure_atom op t =
immediate_operation dbg id (VM_check_state id)
| Best_effort op ->
trigger_cleanup_after_failure_atom op t
| Parallel (_id, _description, ops) ->
| Parallel (_id, _description, ops) | Serial (_id, _description, ops) ->
List.iter (fun op -> trigger_cleanup_after_failure_atom op t) ops
| VM_rename (id1, id2, _) ->
immediate_operation dbg id1 (VM_check_state id1) ;
Expand Down
4 changes: 2 additions & 2 deletions ocaml/xenopsd/lib/xenops_task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ let is_task task = function
| _ ->
None

let parallel_id_with_tracing parallel_id t =
Debug_info.make ~log:parallel_id ~tracing:(Xenops_task.tracing t)
let id_with_tracing id t =
Debug_info.make ~log:id ~tracing:(Xenops_task.tracing t)
|> Debug_info.to_string

let dbg_with_traceparent_of_task t =
Expand Down
3 changes: 1 addition & 2 deletions ocaml/xenopsd/lib/xenops_utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,7 @@ let chunks size lst =
[op] :: xs :: xss
)
[] lst
|> List.map (fun xs -> List.rev xs)
|> List.rev
|> List.rev_map (fun xs -> List.rev xs)

let really_kill pid =
try Unixext.kill_and_wait pid
Expand Down
Loading