Skip to content

Commit

Permalink
vm: inotify: monitor all containerd namespaces for volumes (#923)
Browse files Browse the repository at this point in the history
* vm: inotify: monitor all containerd namespaces for volumes

* docker: ensure user added to group, reduce wait interval
  • Loading branch information
abiosoft authored Dec 10, 2023
1 parent 26d4eef commit a8d533d
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 56 deletions.
142 changes: 89 additions & 53 deletions daemon/process/inotify/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,70 +19,37 @@ func (f *inotifyProcess) monitorContainerVolumes(ctx context.Context, c chan<- [
if f.runtime == "" {
return fmt.Errorf("empty runtime")
}
runtimeCmd := docker.Name
if f.runtime == containerd.Name {
runtimeCmd = "nerdctl"
}

fetch := func() ([]string, error) {
// fetch all containers
var containers []string
{
out, err := f.guest.RunOutput(runtimeCmd, "ps", "-q")
var vols []string

// docker
if f.runtime != containerd.Name {
vols, err := f.fetchVolumes(docker.Name)
if err != nil {
return nil, fmt.Errorf("error listing containers: %w", err)
}
containers = strings.Fields(out)
if len(containers) == 0 {
return nil, nil
return nil, fmt.Errorf("error fetching docker volumes: %w", err)
}
return vols, nil
}

log.Tracef("found containers %+v", containers)

// fetch volumes
var resp []struct {
Mounts []struct {
Source string `json:"Source"`
} `json:"Mounts"`
// containerd
var namespaces []string
out, err := f.guest.RunOutput("sudo", "nerdctl", "namespace", "list", "-q")
if err != nil {
return nil, fmt.Errorf("error retrieving containerd namespaces: %w", err)
}
{
args := []string{runtimeCmd, "inspect"}
args = append(args, containers...)

var buf bytes.Buffer
if err := f.guest.RunWith(nil, &buf, args...); err != nil {
return nil, fmt.Errorf("error inspecting containers: %w", err)
}
if err := json.NewDecoder(&buf).Decode(&resp); err != nil {
return nil, fmt.Errorf("error decoding docker response")
}
if out != "" {
namespaces = strings.Fields(out)
}

// process and discard redundant volumes
vols := []string{}
{
shouldMount := func(child string) bool {
// ignore all invalid directories.
// i.e. directories not within the mounted VM directories
for _, parent := range f.vmVols {
if strings.HasPrefix(child, parent) {
return true
}
}
return false
for _, ns := range namespaces {
v, err := f.fetchVolumes("sudo", "nerdctl", "--namespace", ns)
if err != nil {
return nil, fmt.Errorf("error retrieving containerd volumes: %w", err)
}

for _, r := range resp {
for _, mount := range r.Mounts {
if shouldMount(mount.Source) {
vols = append(vols, mount.Source)
}
}
if len(v) > 0 {
vols = append(vols, v...)
}

vols = omitChildrenDirectories(vols)
log.Tracef("found volumes %+v", vols)
}

return vols, nil
Expand Down Expand Up @@ -110,6 +77,75 @@ func (f *inotifyProcess) monitorContainerVolumes(ctx context.Context, c chan<- [
return nil
}

func (f *inotifyProcess) fetchVolumes(cmdArgs ...string) ([]string, error) {
log := f.log

// fetch all containers
var containers []string
{
args := append([]string{}, cmdArgs...)
args = append(args, "ps", "-q")
out, err := f.guest.RunOutput(args...)
if err != nil {
return nil, fmt.Errorf("error listing containers: %w", err)
}
containers = strings.Fields(out)
if len(containers) == 0 {
return nil, nil
}
}

log.Tracef("found containers %+v", containers)

// fetch volumes
var resp []struct {
Mounts []struct {
Source string `json:"Source"`
} `json:"Mounts"`
}
{
args := append([]string{}, cmdArgs...)
args = append(args, "inspect")
args = append(args, containers...)

var buf bytes.Buffer
if err := f.guest.RunWith(nil, &buf, args...); err != nil {
return nil, fmt.Errorf("error inspecting containers: %w", err)
}
if err := json.NewDecoder(&buf).Decode(&resp); err != nil {
return nil, fmt.Errorf("error decoding docker response")
}
}

// process and discard redundant volumes
vols := []string{}
{
shouldMount := func(child string) bool {
// ignore all invalid directories.
// i.e. directories not within the mounted VM directories
for _, parent := range f.vmVols {
if strings.HasPrefix(child, parent) {
return true
}
}
return false
}

for _, r := range resp {
for _, mount := range r.Mounts {
if shouldMount(mount.Source) {
vols = append(vols, mount.Source)
}
}
}

vols = omitChildrenDirectories(vols)
log.Tracef("found volumes %+v", vols)
}

return vols, nil
}

func omitChildrenDirectories(dirs []string) []string {
sort.Strings(dirs) // sort to put the parent directories first

Expand Down
16 changes: 13 additions & 3 deletions environment/container/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,25 @@ func (d dockerRuntime) Start(ctx context.Context) error {

// TODO: interval is high due to 0.6.3->0.6.4 docker-ce package transition
// to ensure startup is successful
a.Retry("", time.Second*5, 24, func(int) error {
a.Retry("", time.Second, 120, func(int) error {
return d.guest.RunQuiet("sudo", "service", "docker", "start")
})

// service startup takes few seconds, retry at most 5 times before giving up.
a.Retry("", time.Second*5, 12, func(int) error {
// service startup takes few seconds, retry for a minute before giving up.
a.Retry("", time.Second, 60, func(int) error {
return d.guest.RunQuiet("sudo", "docker", "info")
})

// ensure docker is accessible without root
// otherwise, restart to ensure user is added to docker group
a.Add(func() error {
if err := d.guest.RunQuiet("docker", "info"); err == nil {
return nil
}
ctx := context.WithValue(ctx, cli.CtxKeyQuiet, true)
return d.guest.Restart(ctx)
})

return a.Exec()
}

Expand Down

0 comments on commit a8d533d

Please sign in to comment.