Skip to content

Commit

Permalink
Improvements for Zero Impact Catalyst API Deployments (#1347)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Aug 13, 2024
1 parent 39c4009 commit cdf2cd8
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 16 deletions.
10 changes: 6 additions & 4 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint)
func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, eventsEndpoint)
server := http.Server{Addr: cli.HTTPInternalAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -56,7 +56,7 @@ func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipe
return server.Shutdown(ctx)
}

func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) *httprouter.Router {
func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withAuth := middleware.IsAuthorized
Expand All @@ -73,7 +73,7 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey)

catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal)
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal, eventsEndpoint)
ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine}
accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic)
analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB)
Expand Down Expand Up @@ -139,6 +139,8 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
} else {
router.POST("/api/events", withLogging(eventsHandler.ProxyEvents()))
}

return router
Expand Down
1 change: 1 addition & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type Cli struct {
KafkaPassword string
AnalyticsKafkaTopic string
SerfMembersEndpoint string
EventsEndpoint string
CatalystApiURL string

// mapping playbackId to value between 0.0 to 100.0
Expand Down
39 changes: 34 additions & 5 deletions handlers/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,26 @@ type EventsHandlersCollection struct {

mapic mistapiconnector.IMac
bal balancer.Balancer

eventsEndpoint string
}

type Event struct {
Resource string `json:"resource"`
PlaybackID string `json:"playback_id"`
}

func NewEventsHandlersCollection(cluster cluster.Cluster, mapic mistapiconnector.IMac, bal balancer.Balancer) *EventsHandlersCollection {
func NewEventsHandlersCollection(cluster cluster.Cluster, mapic mistapiconnector.IMac, bal balancer.Balancer, eventsEndpoint string) *EventsHandlersCollection {
return &EventsHandlersCollection{
cluster: cluster,
mapic: mapic,
bal: bal,
cluster: cluster,
mapic: mapic,
bal: bal,
eventsEndpoint: eventsEndpoint,
}
}

// Events is a handler called by Studio API to send an event, e.g., to refresh a stream or nuke a stream.
// Events is a handler called by Catalyst API which forwards events from Studio API.
// Used to, e.g., refresh a stream or nuke a stream.
// This event is then propagated to all Serf nodes and then forwarded to catalyst-api and handled by ReceiveUserEvent().
func (d *EventsHandlersCollection) Events() httprouter.Handle {
schema := inputSchemasCompiled["Event"]
Expand Down Expand Up @@ -75,6 +79,31 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
}
}

// ProxyEvents is a handler of Catalyst API called by Studio API.
// It proxies the requests to Catalyst.
func (d *EventsHandlersCollection) ProxyEvents() httprouter.Handle {
// Proxy the request to d.eventsEndpoint
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
// Create a new request to the target endpoint
proxyReq, err := http.NewRequest(req.Method, d.eventsEndpoint, req.Body)
if err != nil {
glog.Errorf("Cannot create proxy request: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot create proxy request", err)
return
}

// Send the request to the target endpoint
client := &http.Client{}
resp, err := client.Do(proxyReq)
if err != nil {
glog.Errorf("Cannot send proxy request: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot send proxy request", err)
return
}
defer resp.Body.Close()
}
}

// ReceiveUserEvent is a handler to receive Serf events from Catalyst.
// The idea is that:
// 1. Studio API sends an event to Catalyst (received by Events() handler)
Expand Down
4 changes: 2 additions & 2 deletions handlers/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestEventHandler(t *testing.T) {
return nil
}).AnyTimes()

catalystApiHandlers := NewEventsHandlersCollection(mc, nil, nil)
catalystApiHandlers := NewEventsHandlersCollection(mc, nil, nil, "")
router := httprouter.New()
router.POST("/events", catalystApiHandlers.Events())

Expand Down Expand Up @@ -114,7 +114,7 @@ func TestReceiveUserEventHandler(t *testing.T) {
ctrl := gomock.NewController(t)
mac := mock_mistapiconnector.NewMockIMac(ctrl)

catalystApiHandlers := NewEventsHandlersCollection(nil, mac, nil)
catalystApiHandlers := NewEventsHandlersCollection(nil, mac, nil, "")
router := httprouter.New()
router.POST("/receiveUserEvent", catalystApiHandlers.ReceiveUserEvent())

Expand Down
31 changes: 26 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -132,6 +133,7 @@ func main() {
fs.StringVar(&cli.KafkaPassword, "kafka-password", "", "Kafka Password")
fs.StringVar(&cli.AnalyticsKafkaTopic, "analytics-kafka-topic", "", "Kafka Topic used to send analytics logs")
fs.StringVar(&cli.SerfMembersEndpoint, "serf-members-endpoint", "", "Endpoint to get the current members in the cluster")
fs.StringVar(&cli.EventsEndpoint, "events-endpoint", "", "Endpoint to send proxied events from catalyst-api into catalyst")
fs.StringVar(&cli.CatalystApiURL, "catalyst-api-url", "", "Endpoint for externally deployed catalyst-api; if not set, use local catalyst-api")
pprofPort := fs.Int("pprof-port", 6061, "Pprof listen port")

Expand Down Expand Up @@ -206,10 +208,9 @@ func main() {
}
broker = misttriggers.NewTriggerBroker()

catalystApiURL := cli.CatalystApiURL
if catalystApiURL == "" {
catalystApiURL = cli.OwnInternalURL()
}
catalystApiURL := resolveCatalystApiURL(cli)
glog.Infof("Using Catalyst API URL: %s", catalystApiURL)

serfMembersEndpoint := cli.SerfMembersEndpoint
if serfMembersEndpoint == "" {
serfMembersEndpoint = cli.OwnInternalURL() + "/api/serf/members"
Expand Down Expand Up @@ -352,13 +353,33 @@ func main() {
})

group.Go(func() error {
return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint)
return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, cli.EventsEndpoint)
})

err = group.Wait()
glog.Infof("Shutdown complete. Reason for shutdown: %s", err)
}

func resolveCatalystApiURL(cli config.Cli) interface{} {
if cli.CatalystApiURL != "" {
return cli.CatalystApiURL
}

switch cli.Mode {
case "all":
return cli.OwnInternalURL()
case "cluster-only":
// Hack to reason about the corresponding stateless catalyst-api service
// Otherwise we would need to specify CATALYST_API_CATALYST_API_URL env variable, which requires restarting
// the whole catalyst node
hostname := os.Getenv("HOSTNAME") // e.g. "staging-catalyst-0"
ecosystem := strings.Split(hostname, "-")[0] // e.g. "staging"
return fmt.Sprintf("http://%s-catalyst-api-%s:7979", ecosystem, hostname)
}
// not used for other modes
return ""
}

// Eventually this will be the main loop of the state machine, but we just have one variable right now.
func reconcileBalancer(ctx context.Context, bal balancer.Balancer, c cluster.Cluster) error {
memberCh := c.MemberChan()
Expand Down

0 comments on commit cdf2cd8

Please sign in to comment.