From d7a49fe9c569571cad6711b0edf8bedf73afca61 Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Fri, 20 Oct 2023 18:46:18 +0200 Subject: [PATCH] Streaming Flux state to the frontend --- cmd/capacitor/main.go | 17 ++- go.mod | 1 + go.sum | 2 + pkg/controllers/controller.go | 16 +++ pkg/streaming/clienthub.go | 213 ++++++++++++++++++++++++++++++++++ web/src/App.js | 2 + web/src/streamingBackend.js | 54 +++++++++ 7 files changed, 303 insertions(+), 2 deletions(-) create mode 100644 pkg/streaming/clienthub.go create mode 100644 web/src/streamingBackend.js diff --git a/cmd/capacitor/main.go b/cmd/capacitor/main.go index fc3360e..dd711b0 100644 --- a/cmd/capacitor/main.go +++ b/cmd/capacitor/main.go @@ -14,6 +14,7 @@ import ( kustomizationv1 "github.com/fluxcd/kustomize-controller/api/v1" sourcev1 "github.com/fluxcd/source-controller/api/v1" "github.com/gimlet-io/capacitor/pkg/controllers" + "github.com/gimlet-io/capacitor/pkg/streaming" "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" "github.com/sirupsen/logrus" @@ -63,6 +64,9 @@ func main() { stopCh := make(chan struct{}) defer close(stopCh) + clientHub := streaming.NewClientHub() + go clientHub.Run() + ctrl := controllers.NewDynamicController( "gitrepositories.source.toolkit.fluxcd.io", dynamicClient, @@ -75,16 +79,21 @@ func main() { fallthrough case "delete": fmt.Printf("Changes in %s\n", objectMeta.Name) - _, err := getFluxState(dynamicClient) + fluxState, err := getFluxState(dynamicClient) if err != nil { panic(err.Error()) } + fluxStateBytes, err := json.Marshal(fluxState) + if err != nil { + panic(err.Error()) + } + clientHub.Broadcast <- fluxStateBytes } return nil }) go ctrl.Run(1, stopCh) - r := setupRouter(dynamicClient) + r := setupRouter(dynamicClient, clientHub) go func() { err = http.ListenAndServe(":9000", r) if err != nil { @@ -167,6 +176,7 @@ func getFluxState(dc *dynamic.DynamicClient) (*fluxState, error) { func setupRouter( dynamicClient *dynamic.DynamicClient, + clientHub *streaming.ClientHub, ) *chi.Mux { r := chi.NewRouter() r.Use(middleware.WithValue("dynamicClient", dynamicClient)) @@ -175,6 +185,9 @@ func setupRouter( w.WriteHeader(http.StatusOK) }) r.Get("/api/fluxState", fluxStateHandler) + r.Get("/ws/", func(w http.ResponseWriter, r *http.Request) { + streaming.ServeWs(clientHub, w, r) + }) filesDir := http.Dir("./web/build") fileServer(r, "/", filesDir) diff --git a/go.mod b/go.mod index cd2c92b..442601d 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/gorilla/websocket v1.5.0 github.com/imdario/mergo v0.3.6 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index b9a0725..69391fc 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= diff --git a/pkg/controllers/controller.go b/pkg/controllers/controller.go index 7a1cad3..fd26d85 100644 --- a/pkg/controllers/controller.go +++ b/pkg/controllers/controller.go @@ -1,3 +1,19 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package controllers import ( diff --git a/pkg/streaming/clienthub.go b/pkg/streaming/clienthub.go new file mode 100644 index 0000000..8c07196 --- /dev/null +++ b/pkg/streaming/clienthub.go @@ -0,0 +1,213 @@ +// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package streaming + +import ( + "bytes" + "net/http" + "time" + + "github.com/gorilla/websocket" + log "github.com/sirupsen/logrus" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 15 + + // Maximum message size allowed from peer. + maxMessageSize = 4096 +) + +var ( + newline = []byte{'\n'} + space = []byte{' '} +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// Client is a middleman between the websocket connection and the hub. +type Client struct { + hub *ClientHub + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan []byte + + clientId string +} + +// readPump pumps messages from the websocket connection to the hub. + +// The application runs readPump in a per-connection goroutine. The application +// ensures that there is at most one reader on a connection by executing all +// reads from this goroutine. +func (c *Client) readPump() { + defer func() { + c.hub.Unregister <- c + c.conn.Close() + }() + c.conn.SetReadLimit(maxMessageSize) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + for { + _, message, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("error: %v", err) + } + break + } + message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) + c.hub.Broadcast <- message + } +} + +// writePump pumps messages from the hub to the websocket connection. +// +// A goroutine running writePump is started for each connection. The +// application ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *Client) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + + // Add queued chat messages to the current websocket message. + n := len(c.send) + for i := 0; i < n; i++ { + w.Write(newline) + w.Write(<-c.send) + } + + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// ServeWs handles websocket requests from the peer. +func ServeWs(hub *ClientHub, w http.ResponseWriter, r *http.Request) { + upgrader.CheckOrigin = func(r *http.Request) bool { return true } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + + client := &Client{ + hub: hub, + conn: conn, + send: make(chan []byte, 256), + } + hub.Register <- client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go client.writePump() + go client.readPump() +} + +// ClientHub maintains the set of active clients and broadcasts messages to the +// clients. +type ClientHub struct { + // Registered clients. + Clients map[*Client]bool + + // Updates to be broadcasted to the clients. + Broadcast chan []byte + + // Updates to be sent to a single user. + Send chan *ClientMessage + + // Register requests from the clients. + Register chan *Client + + // Unregister requests from clients. + Unregister chan *Client +} + +type ClientMessage struct { + ClientId string + Message []byte +} + +func NewClientHub() *ClientHub { + return &ClientHub{ + Broadcast: make(chan []byte), + Send: make(chan *ClientMessage), + Register: make(chan *Client), + Unregister: make(chan *Client), + Clients: make(map[*Client]bool), + } +} + +func (h *ClientHub) Run() { + for { + select { + case client := <-h.Register: + h.Clients[client] = true + case client := <-h.Unregister: + if _, ok := h.Clients[client]; ok { + delete(h.Clients, client) + close(client.send) + } + case message := <-h.Broadcast: + for client := range h.Clients { + select { + case client.send <- message: + default: + close(client.send) + delete(h.Clients, client) + } + } + case clientMessage := <-h.Send: + for client := range h.Clients { + if client.clientId == clientMessage.ClientId { + select { + case client.send <- clientMessage.Message: + default: + close(client.send) + delete(h.Clients, client) + } + } + } + } + } +} diff --git a/web/src/App.js b/web/src/App.js index d88b3a0..39dc478 100644 --- a/web/src/App.js +++ b/web/src/App.js @@ -1,6 +1,7 @@ import logo from './logo.svg'; import './App.css'; import APIBackend from "./apiBackend"; +import StreamingBackend from "./streamingBackend"; import CapacitorClient from "./client"; function App() { @@ -13,6 +14,7 @@ function App() { return ( <> +
logo diff --git a/web/src/streamingBackend.js b/web/src/streamingBackend.js new file mode 100644 index 0000000..d45d64c --- /dev/null +++ b/web/src/streamingBackend.js @@ -0,0 +1,54 @@ +import {Component} from "react"; + +let URL = ''; +if (typeof window !== 'undefined') { + let protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'; + URL = protocol + '://' + window.location.hostname; + + let port = window.location.port + if (window.location.hostname === 'localhost' || window.location.hostname === '127.0.0.1') { + port = 9000 + } + if (port && port !== '') { + URL = URL + ':' + port + } +} + +export default class StreamingBackend extends Component { + componentDidMount() { + console.log("Connecting to " + URL + '/ws/') + + this.ws = new WebSocket(URL + '/ws/'); + this.ws.onopen = this.onOpen; + this.ws.onmessage = this.onMessage; + this.ws.onclose = this.onClose; + + this.onClose = this.onClose.bind(this); + } + + render() { + return null; + } + + onOpen = () => { + console.log('connected'); + }; + + onClose = (evt) => { + console.log('disconnected: ' + evt.code + ': ' + evt.reason); + const ws = new WebSocket(URL + '/ws/'); + ws.onopen = this.onOpen; + ws.onmessage = this.onMessage; + ws.onclose = this.onClose; + this.setState({ + ws + }); + } + + onMessage = (evt) => { + evt.data.split('\n').forEach((line) => { + const message = JSON.parse(line); + console.log(message) + }); + } +}