Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Control API #3255

Merged
merged 11 commits into from
Nov 19, 2024
11 changes: 11 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-livepeer/trickle"

"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/eth"
Expand Down Expand Up @@ -147,6 +148,14 @@ type LivepeerNode struct {
priceInfoForCaps map[string]CapabilityPrices
serviceURI url.URL
segmentMutex *sync.RWMutex

// For live video pipelines, cache for live pipelines; key is the stream name
LivePipelines map[string]*LivePipeline
LiveMu *sync.RWMutex
}

type LivePipeline struct {
ControlPub *trickle.TricklePublisher
}

// NewLivepeerNode creates a new Livepeer Node. Eth can be nil.
Expand All @@ -164,6 +173,8 @@ func NewLivepeerNode(e eth.LivepeerEthClient, wd string, dbh *common.DB) (*Livep
priceInfoForCaps: make(map[string]CapabilityPrices),
StorageConfigs: make(map[string]*transcodeConfig),
storageMutex: &sync.RWMutex{},
LivePipelines: make(map[string]*LivePipeline),
LiveMu: &sync.RWMutex{},
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/google/uuid v1.6.0
github.com/jaypipes/ghw v0.10.0
github.com/jaypipes/pcidb v1.0.0
github.com/livepeer/ai-worker v0.12.1
github.com/livepeer/ai-worker v0.12.2-0.20241118111050-0a9cd07bca51
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18
github.com/livepeer/lpms v0.0.0-20240909171057-fe5aff1fa6a2
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,14 @@ github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+O
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
github.com/livepeer/ai-worker v0.12.1 h1:V6XGxnRmq02GuJP/PYRKTXIZAd2F1jTuErweO5boWxU=
github.com/livepeer/ai-worker v0.12.1/go.mod h1:/Deme7XXRP4BiYXt/j694Ygw+dh8rWJdikJsKY64sjE=
github.com/livepeer/ai-worker v0.12.2-0.20241115144717-cffea497e22e h1:/rGRpiHAhaCzk8fV+RXjkVILea8wSoqpuDQY/CdpilY=
github.com/livepeer/ai-worker v0.12.2-0.20241115144717-cffea497e22e/go.mod h1:/Deme7XXRP4BiYXt/j694Ygw+dh8rWJdikJsKY64sjE=
github.com/livepeer/ai-worker v0.12.2-0.20241115145337-b8477a4881ae h1:suFeJeJqm+PKrtJJO7ubkb9m+6CZPOHbO0xQmxycUEI=
github.com/livepeer/ai-worker v0.12.2-0.20241115145337-b8477a4881ae/go.mod h1:/Deme7XXRP4BiYXt/j694Ygw+dh8rWJdikJsKY64sjE=
github.com/livepeer/ai-worker v0.12.2-0.20241115145705-4cefa2dc7463 h1:k81OkHOR4EBPEBcM8DhFUmkdiL4BdMyZMgwmJ2N/0+E=
github.com/livepeer/ai-worker v0.12.2-0.20241115145705-4cefa2dc7463/go.mod h1:/Deme7XXRP4BiYXt/j694Ygw+dh8rWJdikJsKY64sjE=
github.com/livepeer/ai-worker v0.12.2-0.20241118111050-0a9cd07bca51 h1:+mKaLa8DvxxBJd9KTpbfN48fse+34HteqavOpCklDlI=
github.com/livepeer/ai-worker v0.12.2-0.20241118111050-0a9cd07bca51/go.mod h1:/Deme7XXRP4BiYXt/j694Ygw+dh8rWJdikJsKY64sjE=
leszko marked this conversation as resolved.
Show resolved Hide resolved
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b h1:VQcnrqtCA2UROp7q8ljkh2XA/u0KRgVv0S1xoUvOweE=
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b/go.mod h1:hwJ5DKhl+pTanFWl+EUpw1H7ukPO/H+MFpgA7jjshzw=
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded h1:ZQlvR5RB4nfT+cOQee+WqmaDOgGtP2oDMhcVvR4L0yA=
Expand Down
10 changes: 7 additions & 3 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
// skipping handleAIRequest for now until we have payments

var (
mid = string(core.RandomManifestID())
pubUrl = TrickleHTTPPath + mid
subUrl = pubUrl + "-out"
mid = string(core.RandomManifestID())
pubUrl = TrickleHTTPPath + mid
subUrl = pubUrl + "-out"
controlUrl = pubUrl + "-control"
)
jsonData, err := json.Marshal(
&worker.LiveVideoToVideoResponse{
PublishUrl: pubUrl,
SubscribeUrl: subUrl,
ControlUrl: controlUrl,
})
if err != nil {
respondWithError(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -120,6 +122,8 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
pubCh.CreateChannel()
subCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-out", "video/MP2T")
subCh.CreateChannel()
controlPubCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-control", "application/json")
controlPubCh.CreateChannel()

// Subscribe to the publishUrl for payments monitoring
go func() {
Expand Down
11 changes: 11 additions & 0 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"os"

"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/media"
"github.com/livepeer/go-livepeer/trickle"
"github.com/livepeer/lpms/ffmpeg"
Expand Down Expand Up @@ -83,3 +84,13 @@ func mediamtxSourceTypeToString(s string) (string, error) {
return "", errors.New("unknown media source")
}
}

func startControlPublish(control *url.URL, params aiRequestParams) {
controlPub, err := trickle.NewTricklePublisher(control.String())
if err != nil {
slog.Info("error starting control publisher stream=%s err=%v", "err", params.stream, err)
leszko marked this conversation as resolved.
Show resolved Hide resolved
}
params.node.LiveMu.Lock()
defer params.node.LiveMu.Unlock()
params.node.LivePipelines[params.stream] = &core.LivePipeline{ControlPub: controlPub}
}
49 changes: 47 additions & 2 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func startAIMediaServer(ls *LivepeerServer) error {
ls.HTTPMux.Handle("/text-to-speech", oapiReqValidator(aiMediaServerHandle(ls, jsonDecoder[worker.GenTextToSpeechJSONRequestBody], processTextToSpeech)))

// This is called by the media server when the stream is ready
ls.HTTPMux.Handle("/live/video-to-video/start", ls.StartLiveVideo())
ls.HTTPMux.Handle("/live/video-to-video/{stream}/start", ls.StartLiveVideo())
ls.HTTPMux.Handle("/live/video-to-video/{stream}/update", ls.UpdateLiveVideo())

return nil
}
Expand Down Expand Up @@ -359,7 +360,7 @@ func (ls *LivepeerServer) ImageToVideoResult() http.Handler {
func (ls *LivepeerServer) StartLiveVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
streamName := r.FormValue("stream")
streamName := r.PathValue("stream")
if streamName == "" {
clog.Errorf(ctx, "Missing stream name")
http.Error(w, "Missing stream name", http.StatusBadRequest)
Expand Down Expand Up @@ -435,6 +436,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
ms := media.MediaSegmenter{Workdir: ls.LivepeerNode.WorkDir}
ms.RunSegmentation("rtmp://localhost/"+streamName, ssr.Read)
ssr.Close()
ls.cleanupLive(streamName)
}()

params := aiRequestParams{
Expand All @@ -443,6 +445,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
sessManager: ls.AISessionManager,
segmentReader: ssr,
outputRTMPURL: outputURL,
stream: streamName,
}

req := worker.GenLiveVideoToVideoJSONRequestBody{
Expand All @@ -452,6 +455,48 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
})
}

func (ls *LivepeerServer) UpdateLiveVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Get stream from path param
stream := r.PathValue("stream")
if stream == "" {
http.Error(w, "Missing stream name", http.StatusBadRequest)
return
}
ls.LivepeerNode.LiveMu.RLock()
defer ls.LivepeerNode.LiveMu.RUnlock()
p, ok := ls.LivepeerNode.LivePipelines[stream]
if !ok {
// Stream not found
http.Error(w, "Stream not found", http.StatusNotFound)
return
}
defer r.Body.Close()
params, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

clog.V(6).Infof(ctx, "Sending Live Video Update Control API stream=%s, params=%s", stream, string(params))
if err := p.ControlPub.Write(strings.NewReader(string(params))); err != nil {
leszko marked this conversation as resolved.
Show resolved Hide resolved
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
})
}

func (ls *LivepeerServer) cleanupLive(stream string) {
ls.LivepeerNode.LiveMu.Lock()
defer ls.LivepeerNode.LiveMu.Unlock()
delete(ls.LivepeerNode.LivePipelines, stream)
leszko marked this conversation as resolved.
Show resolved Hide resolved
}

const mediaMTXControlPort = "9997"

func kickInputConnection(sourceID string, sourceType string) error {
Expand Down
8 changes: 7 additions & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type aiRequestParams struct {
// For live video pipelines
segmentReader *media.SwitchableSegmentReader
outputRTMPURL string
stream string
leszko marked this conversation as resolved.
Show resolved Hide resolved
}

// CalculateTextToImageLatencyScore computes the time taken per pixel for an text-to-image request.
Expand Down Expand Up @@ -1038,9 +1039,14 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A
if err != nil {
return nil, fmt.Errorf("sub url %w", err)
}
clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s", pub, sub)
control, err := appendHostname(resp.JSON200.ControlUrl)
if err != nil {
return nil, fmt.Errorf("control pub url - %w", err)
}
clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s", pub, sub, control)
startTricklePublish(pub, params)
startTrickleSubscribe(sub, params)
startControlPublish(control, params)
}
return resp, nil
}
Expand Down
Loading