Skip to content

Commit

Permalink
fix(hscontrol): fixes high cpu usage issue during node conn/disconn
Browse files Browse the repository at this point in the history
  • Loading branch information
amitsingh21 committed Apr 28, 2024
1 parent 3c729d5 commit bb401dc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 44 deletions.
40 changes: 20 additions & 20 deletions hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net"
"net/http"
_ "net/http/pprof" //nolint
"os"
"os/signal"
"sort"
Expand Down Expand Up @@ -257,12 +258,11 @@ func (h *Headscale) expireEphemeralNodesWorker() {
return
}

expiredFound := false
for _, machine := range machines {
if machine.isEphemeral() && machine.LastSeen != nil &&
time.Now().
After(machine.LastSeen.Add(h.cfg.EphemeralNodeInactivityTimeout)) {
expiredFound = true
h.setLastStateChangeToNow(machine.User)
log.Info().
Str("machine", machine.Hostname).
Msg("Ephemeral client removed from database")
Expand All @@ -276,10 +276,6 @@ func (h *Headscale) expireEphemeralNodesWorker() {
}
}
}

if expiredFound {
h.setLastStateChangeToNow()
}
}
}

Expand All @@ -302,12 +298,10 @@ func (h *Headscale) expireExpiredMachinesWorker() {
return
}

expiredFound := false
for index, machine := range machines {
if machine.isExpired() &&
machine.Expiry.After(h.getLastStateChange(user)) {
expiredFound = true

h.setLastStateChangeToNow(machine.User)
err := h.ExpireMachine(&machines[index])
if err != nil {
log.Error().
Expand All @@ -323,10 +317,6 @@ func (h *Headscale) expireExpiredMachinesWorker() {
}
}
}

if expiredFound {
h.setLastStateChangeToNow()
}
}
}

Expand Down Expand Up @@ -717,6 +707,7 @@ func (h *Headscale) Serve() error {
Msgf("listening and serving HTTP on: %s", h.cfg.Addr)

promMux := http.NewServeMux()
promMux.Handle("/debug/pprof/", http.DefaultServeMux)
promMux.Handle("/metrics", promhttp.Handler())

promHTTPServer := &http.Server{
Expand Down Expand Up @@ -905,7 +896,7 @@ func (h *Headscale) getTLSSettings() (*tls.Config, error) {
}
}

func (h *Headscale) setLastStateChangeToNow() {
func (h *Headscale) setLastStateChangeToNow(filteredUsers ...User) {
var err error

now := time.Now().UTC()
Expand All @@ -917,13 +908,22 @@ func (h *Headscale) setLastStateChangeToNow() {
Err(err).
Msg("failed to fetch all users, failing to update last changed state.")
}

for _, user := range users {
lastStateUpdate.WithLabelValues(user.Name, "headscale").Set(float64(now.Unix()))
if h.lastStateChange == nil {
h.lastStateChange = xsync.NewMapOf[time.Time]()
if len(filteredUsers) > 0 {
for _, user := range filteredUsers {
lastStateUpdate.WithLabelValues(user.Name, "headscale").Set(float64(now.Unix()))
if h.lastStateChange == nil {
h.lastStateChange = xsync.NewMapOf[time.Time]()
}
h.lastStateChange.Store(user.Name, now)
}
} else {
for _, user := range users {
lastStateUpdate.WithLabelValues(user.Name, "headscale").Set(float64(now.Unix()))
if h.lastStateChange == nil {
h.lastStateChange = xsync.NewMapOf[time.Time]()
}
h.lastStateChange.Store(user.Name, now)
}
h.lastStateChange.Store(user.Name, now)
}
}

Expand Down
16 changes: 10 additions & 6 deletions hscontrol/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func filterMachinesByACL(
if peer.ID == machine.ID {
continue
}
//Skip machines not belonging to this user
if peer.User.Name != machine.User.Name {
continue
}

if machine.canAccess(filter, &machines[index]) || peer.canAccess(filter, machine) {
result = append(result, peer)
Expand Down Expand Up @@ -414,7 +418,7 @@ func (h *Headscale) SetTags(machine *Machine, tags []string) error {
if err := h.UpdateACLRules(); err != nil && !errors.Is(err, errEmptyPolicy) {
return err
}
h.setLastStateChangeToNow()
h.setLastStateChangeToNow(machine.User)

if err := h.db.Save(machine).Error; err != nil {
return fmt.Errorf("failed to update tags for machine in the database: %w", err)
Expand All @@ -428,7 +432,7 @@ func (h *Headscale) ExpireMachine(machine *Machine) error {
now := time.Now()
machine.Expiry = &now

h.setLastStateChangeToNow()
h.setLastStateChangeToNow(machine.User)

if err := h.db.Save(machine).Error; err != nil {
return fmt.Errorf("failed to expire machine in the database: %w", err)
Expand All @@ -455,7 +459,7 @@ func (h *Headscale) RenameMachine(machine *Machine, newName string) error {
}
machine.GivenName = newName

h.setLastStateChangeToNow()
h.setLastStateChangeToNow(machine.User)

if err := h.db.Save(machine).Error; err != nil {
return fmt.Errorf("failed to rename machine in the database: %w", err)
Expand All @@ -471,7 +475,7 @@ func (h *Headscale) RefreshMachine(machine *Machine, expiry time.Time) error {
machine.LastSuccessfulUpdate = &now
machine.Expiry = &expiry

h.setLastStateChangeToNow()
h.setLastStateChangeToNow(machine.User)

if err := h.db.Save(machine).Error; err != nil {
return fmt.Errorf(
Expand Down Expand Up @@ -536,7 +540,7 @@ func (h *Headscale) isOutdated(machine *Machine) bool {
// TODO(kradalby): Only request updates from users where we can talk to nodes
// This would mostly be for a bit of performance, and can be calculated based on
// ACLs.
lastChange := h.getLastStateChange()
lastChange := h.getLastStateChange(machine.User)
lastUpdate := machine.CreatedAt
if machine.LastSuccessfulUpdate != nil {
lastUpdate = *machine.LastSuccessfulUpdate
Expand Down Expand Up @@ -1068,7 +1072,7 @@ func (h *Headscale) enableRoutes(machine *Machine, routeStrs ...string) error {
}
}

h.setLastStateChangeToNow()
h.setLastStateChangeToNow(machine.User)

return nil
}
Expand Down
20 changes: 10 additions & 10 deletions hscontrol/protocol_common_poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (h *Headscale) handlePollCommon(

// There has been an update to _any_ of the nodes that the other nodes would
// need to know about
h.setLastStateChangeToNow()
h.setLastStateChangeToNow(machine.User)

// The request is not ReadOnly, so we need to set up channels for updating
// peers via longpoll
Expand Down Expand Up @@ -322,9 +322,9 @@ func (h *Headscale) pollNetMapStream(
Str("channel", "pollData").
Int("bytes", len(data)).
Msg("Data from pollData channel written successfully")
// TODO(kradalby): Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
// TODO(kradalby): Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err = h.UpdateMachineFromDatabase(machine)
if err != nil {
log.Error().
Expand Down Expand Up @@ -406,9 +406,9 @@ func (h *Headscale) pollNetMapStream(
Str("channel", "keepAlive").
Int("bytes", len(data)).
Msg("Keep alive sent successfully")
// TODO(kradalby): Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
// TODO(kradalby): Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err = h.UpdateMachineFromDatabase(machine)
if err != nil {
log.Error().
Expand Down Expand Up @@ -575,9 +575,9 @@ func (h *Headscale) pollNetMapStream(
Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname).
Msg("The client has closed the connection")
// TODO: Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
// TODO: Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err := h.UpdateMachineFromDatabase(machine)
if err != nil {
log.Error().
Expand Down
13 changes: 5 additions & 8 deletions hscontrol/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (h *Headscale) handlePrimarySubnetFailover() error {
log.Error().Err(err).Msg("error getting routes")
}

routesChanged := false
usersChanged := make(map[string]User, 0)
for pos, route := range routes {
if route.isExitRoute() {
continue
Expand All @@ -333,9 +333,7 @@ func (h *Headscale) handlePrimarySubnetFailover() error {

return err
}

routesChanged = true

usersChanged[route.Machine.User.Name] = route.Machine.User
continue
}
}
Expand Down Expand Up @@ -408,12 +406,11 @@ func (h *Headscale) handlePrimarySubnetFailover() error {
return err
}

routesChanged = true
usersChanged[route.Machine.User.Name] = route.Machine.User
}
}

if routesChanged {
h.setLastStateChangeToNow()
for _, user := range usersChanged {
h.setLastStateChangeToNow(user)
}

return nil
Expand Down

0 comments on commit bb401dc

Please sign in to comment.