Skip to content

Commit

Permalink
Working logs
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Stan <[email protected]>
  • Loading branch information
andreistan26 committed Feb 20, 2024
1 parent d0f9de7 commit e0313fc
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 76 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ require (
oras.land/oras-go/v2 v2.2.1
sdk.kraft.cloud v0.2.4
sigs.k8s.io/kustomize/kyaml v0.14.3
xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240202070433-091466ba55d1
)

replace github.com/vishvananda/netlink => github.com/craciunoiuc/netlink v1.2.1-beta.2
Expand Down Expand Up @@ -225,4 +224,5 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240201170224-b1fdd7d0e47e // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1695,5 +1695,9 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240201170224-b1fdd7d0e47e h1:uPirkt0proXAhh2P1XyGlOeSVSc3m8K5sxJQD6AlNRA=
xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240201170224-b1fdd7d0e47e/go.mod h1:tbZ4iMnk8RWkXPxTiCGdAw3hCOa3feShlf3sBh50uIc=
xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240202070433-091466ba55d1 h1:BKDXbYIOBUOsA4aMRypdNbHSZnFc0TaeCtWC/yu2P7g=
xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240202070433-091466ba55d1/go.mod h1:tbZ4iMnk8RWkXPxTiCGdAw3hCOa3feShlf3sBh50uIc=
xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240214131806-d9bfc2810e1c h1:AaggY2inx2rKriQEYKJnwkPl7u9opWLuV2rgS4918i8=
xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240214131806-d9bfc2810e1c/go.mod h1:tbZ4iMnk8RWkXPxTiCGdAw3hCOa3feShlf3sBh50uIc=
4 changes: 3 additions & 1 deletion machine/xen/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package xen

import (
"strings"

"xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight"
)

Expand Down Expand Up @@ -108,7 +110,7 @@ func WithUuid(uuid string) XenOption {

func WithArgs(args []string) XenOption {
return func(cfg *xenlight.DomainConfig) error {
cfg.BInfo.Extra = xenlight.StringList(args)
cfg.BInfo.Cmdline = strings.Join(args, " ")
return nil
}
}
Expand Down
149 changes: 75 additions & 74 deletions machine/xen/v1alpha1.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (service *machineV1alpha1Service) Create(ctx context.Context, machine *mach
machine.Spec.Resources.Requests[corev1.ResourceCPU] = quantity
}

// TODO(andreistan26): Check if the name is already in use as a xen domain
xenOpts := []XenOption{
WithCpu(int(machine.Spec.Resources.Requests.Cpu().Value())),
WithMemoryKb(uint64(machine.Spec.Resources.Requests.Memory().Value() / XenMemoryScale)),
Expand All @@ -110,12 +109,10 @@ func (service *machineV1alpha1Service) Create(ctx context.Context, machine *mach
xenOpts = append(xenOpts, WithRamdisk(machine.Status.InitrdPath))
}

// TODO(andreistan26): Add port mapping
if len(machine.Spec.Ports) > 0 {
return machine, fmt.Errorf("mapping ports is not supported for xen")
}

// TODO(andreistan26): Add args
kernelArgs, err := ukargparse.Parse(machine.Spec.KernelArgs...)
if err != nil {
return machine, err
Expand All @@ -136,7 +133,6 @@ func (service *machineV1alpha1Service) Create(ctx context.Context, machine *mach
mac = startMac.String()
}

//TODO(andreistan26): refactor this
nic, err := xenlight.NewDeviceNic()
if err != nil {
return nil, err
Expand All @@ -159,14 +155,14 @@ func (service *machineV1alpha1Service) Create(ctx context.Context, machine *mach
switch vol.Spec.Driver {
case "9pfs":
mounttag := fmt.Sprintf("fs%d", i+1)
// TODO(andreistan26): refactor this
p9Dev, err := xenlight.NewDeviceP9()
if err != nil {
return nil, err
}

p9Dev.Tag = mounttag
p9Dev.Path = vol.Spec.Source
p9Dev.SecurityModel = "none"

xenOpts = append(xenOpts,
WithP9(*p9Dev),
Expand Down Expand Up @@ -208,18 +204,18 @@ func (service *machineV1alpha1Service) Create(ctx context.Context, machine *mach
return machine, fmt.Errorf("could not create xen config: %w", err)
}

log.G(ctx).Infof("Creating xen domain with config")
machine.CreationTimestamp = metav1.Now()

domID, err := xenCtx.DomainCreateNew(config)
if err != nil {
return machine, fmt.Errorf("could not create xen domain: %w", err)
}

config.CInfo.Domid = domID

machine.CreationTimestamp = metav1.Now()
machine.Status.PlatformConfig = &XenConfig{
machine.Status.PlatformConfig = XenConfig{
DomID: domID,
}

machine.Status.State = machinev1alpha1.MachineStateCreated

return machine, nil
Expand All @@ -229,15 +225,14 @@ func (service *machineV1alpha1Service) Start(ctx context.Context, machine *machi
return machine, fmt.Errorf("machine has no platform config")
}

config := machine.Status.PlatformConfig.(*XenConfig)
config := machine.Status.PlatformConfig.(XenConfig)

xenCtx, err := xenlight.NewContext()
if err != nil {
return nil, fmt.Errorf("could not create xen context: %w", err)
}
defer xenCtx.Close()

log.G(ctx).Infof("Unpausing xen domain %d", config.DomID)
err = xenCtx.DomainUnpause(config.DomID)
if err != nil {
return machine, fmt.Errorf("could not unpause xen domain: %w", err)
Expand All @@ -246,66 +241,14 @@ func (service *machineV1alpha1Service) Start(ctx context.Context, machine *machi
machine.Status.State = machinev1alpha1.MachineStateRunning
machine.Status.StartedAt = time.Now()

// Start appending pts output to logfile: pts -> chan -> log file
pts, err := xenCtx.PrimaryConsoleGetTty(uint32(config.DomID))
log.G(ctx).Infof("Getting xen domain pts: %v", pts)
if err != nil {
return machine, fmt.Errorf("could not get xen domain pts: %v", err)
}

go func() {
ptsChan := make(chan []byte)
errChan := make(chan error)

ptsFD, err := os.OpenFile(pts, os.O_RDONLY, 0o644)
if err != nil {
log.G(ctx).Errorf("could not open xen domain pts: %v", err)
return
}

go func() {
for {
buf := make([]byte, 1024)
n, err := ptsFD.Read(buf)
if err != nil {
errChan <- err
return
}
ptsChan <- buf[:n]
}
}()

logFD, err := os.OpenFile(machine.Status.LogFile, os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
log.G(ctx).Errorf("log file not found after create: %v", err)
return
}

for {
select {
case err := <-errChan:
log.G(ctx).Errorf("could not read from pts: %v", err)
case line := <-ptsChan:
_, err := logFD.Write(line)
if err != nil {
log.G(ctx).Errorf("could not write to log file: %v", err)
}
case <-ctx.Done():
logFD.Close()
ptsFD.Close()
return
}
}
}()

return machine, nil
}
func (service *machineV1alpha1Service) Pause(ctx context.Context, machine *machinev1alpha1.Machine) (*machinev1alpha1.Machine, error) {
if machine.Status.PlatformConfig == nil {
return machine, fmt.Errorf("machine has no platform config")
}

config, ok := machine.Status.PlatformConfig.(*XenConfig)
config, ok := machine.Status.PlatformConfig.(XenConfig)
if !ok {
return machine, fmt.Errorf("machine has no platform config")
}
Expand All @@ -329,7 +272,7 @@ func (service *machineV1alpha1Service) Stop(ctx context.Context, machine *machin
return machine, nil
}

config, ok := machine.Status.PlatformConfig.(*XenConfig)
config, ok := machine.Status.PlatformConfig.(XenConfig)
if !ok {
return machine, fmt.Errorf("machine has no platform config")
}
Expand All @@ -339,7 +282,6 @@ func (service *machineV1alpha1Service) Stop(ctx context.Context, machine *machin
return nil, fmt.Errorf("could not create xen context: %w", err)
}
defer xenCtx.Close()
log.G(ctx).Infof("Destroying xen domain %d", config.DomID)
if err := xenCtx.DomainDestroy(config.DomID); err != nil {
return machine, fmt.Errorf("could not destroy xen domain: %w", err)
}
Expand All @@ -358,7 +300,7 @@ func (service *machineV1alpha1Service) Delete(ctx context.Context, machine *mach
return nil, err
}
func (service *machineV1alpha1Service) Get(ctx context.Context, machine *machinev1alpha1.Machine) (*machinev1alpha1.Machine, error) {
config, ok := machine.Status.PlatformConfig.(*XenConfig)
config, ok := machine.Status.PlatformConfig.(XenConfig)
if !ok {
return machine, fmt.Errorf("machine has no platform config")
}
Expand All @@ -374,7 +316,7 @@ func (service *machineV1alpha1Service) Get(ctx context.Context, machine *machine

dominfo := &xenlight.Dominfo{}

// until xenCtx.DomainInfo is fixed use ListDomain
// Should be done with xenCtx.DomainInfo, but it currently does not work
doms := xenCtx.ListDomain()
if err != nil {
return machine, fmt.Errorf("could not list xen domains: %w", err)
Expand Down Expand Up @@ -404,7 +346,6 @@ func (service *machineV1alpha1Service) Get(ctx context.Context, machine *machine
func (service *machineV1alpha1Service) List(ctx context.Context, machines *machinev1alpha1.MachineList) (*machinev1alpha1.MachineList, error) {
cached := machines.Items
machines.Items = make([]zip.Object[machinev1alpha1.MachineSpec, machinev1alpha1.MachineStatus], len(cached))

for i, machine := range cached {
machine, err := service.Get(ctx, &machine)
if err != nil {
Expand All @@ -419,13 +360,11 @@ func (service *machineV1alpha1Service) List(ctx context.Context, machines *machi
}

func (service *machineV1alpha1Service) Watch(ctx context.Context, machine *machinev1alpha1.Machine) (chan *machinev1alpha1.Machine, chan error, error) {
config, ok := machine.Status.PlatformConfig.(*XenConfig)
config, ok := machine.Status.PlatformConfig.(XenConfig)
if !ok {
return nil, nil, fmt.Errorf("machine has no platform config")
}

log.G(ctx).Infof("Watching xen domain %d", config.DomID)

w, err := NewWatcher(config.DomID)
if err != nil {
return nil, nil, err
Expand All @@ -440,7 +379,6 @@ func (service *machineV1alpha1Service) Watch(ctx context.Context, machine *machi
errs := make(chan error)

go func() {
log.G(ctx).Infof("Querying initial state for xen domain %d", config.DomID)
intialMachine, err := service.Get(ctx, machine)
if err != nil {
errs <- err
Expand All @@ -453,7 +391,6 @@ func (service *machineV1alpha1Service) Watch(ctx context.Context, machine *machi
w.Close()
return
case <-watch:
log.G(ctx).Infof("Received event for xen domain %d", config.DomID)
machine, err := service.Get(ctx, machine)
if err != nil {
errs <- err
Expand All @@ -468,6 +405,70 @@ func (service *machineV1alpha1Service) Watch(ctx context.Context, machine *machi
return events, errs, nil
}
func (service *machineV1alpha1Service) Logs(ctx context.Context, machine *machinev1alpha1.Machine) (chan string, chan error, error) {
config, ok := machine.Status.PlatformConfig.(XenConfig)
if !ok {
return nil, nil, fmt.Errorf("machine has no platform config")
}

xenCtx, err := xenlight.NewContext()
if err != nil {
return nil, nil, fmt.Errorf("could not create xen context: %w", err)
}
defer xenCtx.Close()

pts, err := xenCtx.PrimaryConsoleGetTty(uint32(config.DomID))
if err != nil {
return nil, nil, fmt.Errorf("could not get xen domain pts: %v", err)
}

// Start appending pts output to logfile: pts -> chan -> log file
go func() {
ptsChan := make(chan []byte)
errChan := make(chan error)

ptsFD, err := os.OpenFile(pts, os.O_RDONLY, 0o644)
if err != nil {
log.G(ctx).Errorf("could not open xen domain pts: %v", err)
return
}

go func() {
buf := make([]byte, 1024)
for {
n, err := ptsFD.Read(buf)
if err != nil {
if err != os.ErrClosed {
errChan <- err
}
return
}
ptsChan <- buf[:n]
}
}()

logFD, err := os.OpenFile(machine.Status.LogFile, os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
log.G(ctx).Errorf("log file not found after create: %v", err)
return
}

for {
select {
case err := <-errChan:
log.G(ctx).Errorf("could not read from pts: %v", err)
case line := <-ptsChan:
_, err := logFD.Write(line)
if err != nil {
log.G(ctx).Errorf("could not write to log file: %v", err)
}
case <-ctx.Done():
logFD.Close()
ptsFD.Close()
return
}
}
}()

return logtail.NewLogTail(ctx, machine.Status.LogFile)
}

Expand Down

0 comments on commit e0313fc

Please sign in to comment.