Skip to content

Commit

Permalink
Add automatic session limit and set max sessions with livepeer_cli (l…
Browse files Browse the repository at this point in the history
  • Loading branch information
eliteprox committed Feb 21, 2024
1 parent d4a0a0c commit 427d91c
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- \#2839 Add Orchestrator blocklist CLI parameter (@mjh1)

#### Orchestrator
- \#2781 Add automatic session limit and set max sessions with livepeer_cli

#### Transcoder

Expand Down
2 changes: 1 addition & 1 deletion cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.TranscodingOptions = flag.String("transcodingOptions", *cfg.TranscodingOptions, "Transcoding options for broadcast job, or path to json config")
cfg.MaxAttempts = flag.Int("maxAttempts", *cfg.MaxAttempts, "Maximum transcode attempts")
cfg.SelectRandFreq = flag.Float64("selectRandFreq", *cfg.SelectRandFreq, "Frequency to randomly select unknown orchestrators (on-chain mode only)")
cfg.MaxSessions = flag.Int("maxSessions", *cfg.MaxSessions, "Maximum number of concurrent transcoding sessions for Orchestrator, maximum number or RTMP streams for Broadcaster, or maximum capacity for transcoder")
cfg.MaxSessions = flag.String("maxSessions", *cfg.MaxSessions, "Maximum number of concurrent transcoding sessions for Orchestrator or 'auto' for dynamic limit, maximum number of RTMP streams for Broadcaster, or maximum capacity for transcoder.")
cfg.CurrentManifest = flag.Bool("currentManifest", *cfg.CurrentManifest, "Expose the currently active ManifestID as \"/stream/current.m3u8\"")
cfg.Nvidia = flag.String("nvidia", *cfg.Nvidia, "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)")
cfg.Netint = flag.String("netint", *cfg.Netint, "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)")
Expand Down
26 changes: 19 additions & 7 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"os/user"
"path/filepath"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -81,7 +82,7 @@ type LivepeerConfig struct {
TranscodingOptions *string
MaxAttempts *int
SelectRandFreq *float64
MaxSessions *int
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Netint *string
Expand Down Expand Up @@ -150,7 +151,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultTranscodingOptions := "P240p30fps16x9,P360p30fps16x9"
defaultMaxAttempts := 3
defaultSelectRandFreq := 0.3
defaultMaxSessions := 10
defaultMaxSessions := strconv.Itoa(10)
defaultCurrentManifest := false
defaultNvidia := ""
defaultNetint := ""
Expand Down Expand Up @@ -290,9 +291,20 @@ func DefaultLivepeerConfig() LivepeerConfig {
}

func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if *cfg.MaxSessions <= 0 {
glog.Fatal("-maxSessions must be greater than zero")
return
if *cfg.MaxSessions == "auto" && *cfg.Orchestrator {
if *cfg.Transcoder {
glog.Fatal("-maxSessions 'auto' cannot be used when both -orchestrator and -transcoder are specified")
return
}
core.MaxSessions = 0
} else {
intMaxSessions, err := strconv.Atoi(*cfg.MaxSessions)
if err != nil || intMaxSessions <= 0 {
glog.Fatal("-maxSessions must be 'auto' or greater than zero")
return
}

core.MaxSessions = intMaxSessions
}

if *cfg.Netint != "" && *cfg.Nvidia != "" {
Expand Down Expand Up @@ -746,6 +758,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}
}

n.AutoSessionLimit = *cfg.MaxSessions == "auto"
n.AutoAdjustPrice = *cfg.AutoAdjustPrice

ev, _ := new(big.Int).SetString(*cfg.TicketEV, 10)
Expand Down Expand Up @@ -981,7 +994,6 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}
}

core.MaxSessions = *cfg.MaxSessions
if lpmon.Enabled {
lpmon.MaxSessions(core.MaxSessions)
}
Expand Down Expand Up @@ -1197,7 +1209,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Fatal("Missing -orchAddr")
}

go server.RunTranscoder(n, orchURLs[0].Host, *cfg.MaxSessions, transcoderCaps)
go server.RunTranscoder(n, orchURLs[0].Host, core.MaxSessions, transcoderCaps)
}

switch n.NodeType {
Expand Down
1 change: 1 addition & 0 deletions cmd/livepeer_cli/livepeer_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (w *wizard) initializeOptions() []wizardOpt {
{desc: "Vote in a poll", invoke: w.vote, orchestrator: true},
{desc: "Set max ticket face value", invoke: w.setMaxFaceValue, orchestrator: true},
{desc: "Set price for broadcaster", invoke: w.setPriceForBroadcaster, orchestrator: true},
{desc: "Set maximum sessions", invoke: w.setMaxSessions, orchestrator: true, notOrchestrator: false},
{desc: "Exit", invoke: func() {
fmt.Println("Goodbye, my friend")
os.Exit(0)
Expand Down
24 changes: 24 additions & 0 deletions cmd/livepeer_cli/wizard_transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,27 @@ func (w *wizard) setPriceForBroadcaster() {
}

}

func (w *wizard) setMaxSessions() {
fmt.Println("Enter the maximum # of sessions")
maxSessions := w.readStringAndValidate(func(in string) (string, error) {
intVal, err := strconv.Atoi(in)
if "" == in || (in != "auto" && intVal <= 0 && err != nil) {
return "", fmt.Errorf("Max Sessions must be 'auto' or greater than zero")
}

return in, nil
})

data := url.Values{
"maxSessions": {fmt.Sprintf("%v", maxSessions)},
}
result, ok := httpPostWithParams(fmt.Sprintf("http://%v:%v/setMaxSessions", w.host, w.httpPort), data)
if ok {
fmt.Printf(result)
return
} else {
fmt.Printf("Error setting max sessions: %v", result)
return
}
}
23 changes: 23 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"sync"
"time"

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/pm"

"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/eth"
lpmon "github.com/livepeer/go-livepeer/monitor"
)

var ErrTranscoderAvail = errors.New("ErrTranscoderUnavailable")
Expand Down Expand Up @@ -80,6 +82,7 @@ type LivepeerNode struct {
Balances *AddressBalances
Capabilities *Capabilities
AutoAdjustPrice bool
AutoSessionLimit bool
// Broadcaster public fields
Sender pm.Sender

Expand Down Expand Up @@ -154,3 +157,23 @@ func (n *LivepeerNode) SetMaxFaceValue(maxfacevalue *big.Int) {

n.Recipient.SetMaxFaceValue(maxfacevalue)
}

func (n *LivepeerNode) SetMaxSessions(s int) {
n.mu.Lock()
defer n.mu.Unlock()
MaxSessions = s

//update metrics reporting
if lpmon.Enabled {
lpmon.MaxSessions(MaxSessions)
}

glog.Infof("Updated session limit to %d", MaxSessions)
}

func (n *LivepeerNode) GetCurrentCapacity() int {
n.TranscoderManager.RTmutex.Lock()
defer n.TranscoderManager.RTmutex.Unlock()
_, totalCapacity, _ := n.TranscoderManager.totalLoadAndCapacity()
return totalCapacity
}
9 changes: 9 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,18 @@ func (n *LivepeerNode) serveTranscoder(stream net.Transcoder_RegisterTranscoderS
coreCaps := CapabilitiesFromNetCapabilities(capabilities)
n.Capabilities.AddCapacity(coreCaps)
defer n.Capabilities.RemoveCapacity(coreCaps)

if n.AutoSessionLimit {
n.SetMaxSessions(n.GetCurrentCapacity() + capacity)
}

// Manage blocks while transcoder is connected
n.TranscoderManager.Manage(stream, capacity, capabilities)
glog.V(common.DEBUG).Infof("Closing transcoder=%s channel", from)

if n.AutoSessionLimit {
defer n.SetMaxSessions(n.GetCurrentCapacity())
}
}

func (rtm *RemoteTranscoderManager) transcoderResults(tcID int64, res *RemoteTranscoderResult) {
Expand Down
21 changes: 21 additions & 0 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,27 @@ func (s *LivepeerServer) setPriceForBroadcaster() http.Handler {
})
}

func (s *LivepeerServer) setMaxSessions() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
frmMaxSessions := r.FormValue("maxSessions")
if frmMaxSessions == "auto" {
s.LivepeerNode.AutoSessionLimit = true
s.LivepeerNode.SetMaxSessions(s.LivepeerNode.GetCurrentCapacity())
respondOk(w, []byte("Max sessions set to auto\n"))
} else if maxSessions, err := strconv.Atoi(frmMaxSessions); err == nil {
if maxSessions > 0 {
s.LivepeerNode.AutoSessionLimit = false
s.LivepeerNode.SetMaxSessions(maxSessions)
respondOk(w, []byte(fmt.Sprintf("Max sessions set to %d\n", maxSessions)))
} else {
respond400(w, "Max Sessions must be 'auto' or greater than zero")
}
} else {
respond400(w, err.Error())
}
})
}

// Bond, withdraw, reward
func bondHandler(client eth.LivepeerEthClient) http.Handler {
return mustHaveClient(client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
1 change: 1 addition & 0 deletions server/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (s *LivepeerServer) cliWebServerHandlers(bindAddr string) *http.ServeMux {
mux.Handle("/setOrchestratorConfig", mustHaveFormParams(s.setOrchestratorConfigHandler(client)))
mux.Handle("/setMaxFaceValue", mustHaveFormParams(s.setMaxFaceValueHandler(), "maxfacevalue"))
mux.Handle("/setPriceForBroadcaster", mustHaveFormParams(s.setPriceForBroadcaster(), "pricePerUnit", "pixelsPerUnit", "broadcasterEthAddr"))
mux.Handle("/setMaxSessions", mustHaveFormParams(s.setMaxSessions(), "maxSessions"))

// Bond, withdraw, reward
mux.Handle("/bond", mustHaveFormParams(bondHandler(client), "amount", "toAddr"))
Expand Down

0 comments on commit 427d91c

Please sign in to comment.