Skip to content

Commit

Permalink
Streaming Flux state to the frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
laszlocph committed Oct 20, 2023
1 parent e670699 commit d7a49fe
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 2 deletions.
17 changes: 15 additions & 2 deletions cmd/capacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
kustomizationv1 "github.com/fluxcd/kustomize-controller/api/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
"github.com/gimlet-io/capacitor/pkg/controllers"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -63,6 +64,9 @@ func main() {
stopCh := make(chan struct{})
defer close(stopCh)

clientHub := streaming.NewClientHub()
go clientHub.Run()

ctrl := controllers.NewDynamicController(
"gitrepositories.source.toolkit.fluxcd.io",
dynamicClient,
Expand All @@ -75,16 +79,21 @@ func main() {
fallthrough
case "delete":
fmt.Printf("Changes in %s\n", objectMeta.Name)
_, err := getFluxState(dynamicClient)
fluxState, err := getFluxState(dynamicClient)
if err != nil {
panic(err.Error())
}
fluxStateBytes, err := json.Marshal(fluxState)
if err != nil {
panic(err.Error())
}
clientHub.Broadcast <- fluxStateBytes
}
return nil
})
go ctrl.Run(1, stopCh)

r := setupRouter(dynamicClient)
r := setupRouter(dynamicClient, clientHub)
go func() {
err = http.ListenAndServe(":9000", r)
if err != nil {
Expand Down Expand Up @@ -167,6 +176,7 @@ func getFluxState(dc *dynamic.DynamicClient) (*fluxState, error) {

func setupRouter(
dynamicClient *dynamic.DynamicClient,
clientHub *streaming.ClientHub,
) *chi.Mux {
r := chi.NewRouter()
r.Use(middleware.WithValue("dynamicClient", dynamicClient))
Expand All @@ -175,6 +185,9 @@ func setupRouter(
w.WriteHeader(http.StatusOK)
})
r.Get("/api/fluxState", fluxStateHandler)
r.Get("/ws/", func(w http.ResponseWriter, r *http.Request) {
streaming.ServeWs(clientHub, w, r)
})

filesDir := http.Dir("./web/build")
fileServer(r, "/", filesDir)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/gorilla/websocket v1.5.0
github.com/imdario/mergo v0.3.6 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
Expand Down
16 changes: 16 additions & 0 deletions pkg/controllers/controller.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
Expand Down
213 changes: 213 additions & 0 deletions pkg/streaming/clienthub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package streaming

import (
"bytes"
"net/http"
"time"

"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)

const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 15

// Maximum message size allowed from peer.
maxMessageSize = 4096
)

var (
newline = []byte{'\n'}
space = []byte{' '}
)

var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *ClientHub

// The websocket connection.
conn *websocket.Conn

// Buffered channel of outbound messages.
send chan []byte

clientId string
}

// readPump pumps messages from the websocket connection to the hub.

// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
defer func() {
c.hub.Unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.Broadcast <- message
}
}

// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)

// Add queued chat messages to the current websocket message.
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
}

if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}

// ServeWs handles websocket requests from the peer.
func ServeWs(hub *ClientHub, w http.ResponseWriter, r *http.Request) {
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}

client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
}
hub.Register <- client

// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()
}

// ClientHub maintains the set of active clients and broadcasts messages to the
// clients.
type ClientHub struct {
// Registered clients.
Clients map[*Client]bool

// Updates to be broadcasted to the clients.
Broadcast chan []byte

// Updates to be sent to a single user.
Send chan *ClientMessage

// Register requests from the clients.
Register chan *Client

// Unregister requests from clients.
Unregister chan *Client
}

type ClientMessage struct {
ClientId string
Message []byte
}

func NewClientHub() *ClientHub {
return &ClientHub{
Broadcast: make(chan []byte),
Send: make(chan *ClientMessage),
Register: make(chan *Client),
Unregister: make(chan *Client),
Clients: make(map[*Client]bool),
}
}

func (h *ClientHub) Run() {
for {
select {
case client := <-h.Register:
h.Clients[client] = true
case client := <-h.Unregister:
if _, ok := h.Clients[client]; ok {
delete(h.Clients, client)
close(client.send)
}
case message := <-h.Broadcast:
for client := range h.Clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.Clients, client)
}
}
case clientMessage := <-h.Send:
for client := range h.Clients {
if client.clientId == clientMessage.ClientId {
select {
case client.send <- clientMessage.Message:
default:
close(client.send)
delete(h.Clients, client)
}
}
}
}
}
}
2 changes: 2 additions & 0 deletions web/src/App.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logo from './logo.svg';
import './App.css';
import APIBackend from "./apiBackend";
import StreamingBackend from "./streamingBackend";
import CapacitorClient from "./client";

function App() {
Expand All @@ -13,6 +14,7 @@ function App() {
return (
<>
<APIBackend capacitorClient={capacitorClient}/>
<StreamingBackend capacitorClient={capacitorClient}/>
<div className="App">
<header className="App-header">
<img src={logo} className="App-logo" alt="logo" />
Expand Down
54 changes: 54 additions & 0 deletions web/src/streamingBackend.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {Component} from "react";

let URL = '';
if (typeof window !== 'undefined') {
let protocol = window.location.protocol === 'https:' ? 'wss' : 'ws';
URL = protocol + '://' + window.location.hostname;

let port = window.location.port
if (window.location.hostname === 'localhost' || window.location.hostname === '127.0.0.1') {
port = 9000
}
if (port && port !== '') {
URL = URL + ':' + port
}
}

export default class StreamingBackend extends Component {
componentDidMount() {
console.log("Connecting to " + URL + '/ws/')

this.ws = new WebSocket(URL + '/ws/');
this.ws.onopen = this.onOpen;
this.ws.onmessage = this.onMessage;
this.ws.onclose = this.onClose;

this.onClose = this.onClose.bind(this);
}

render() {
return null;
}

onOpen = () => {
console.log('connected');
};

onClose = (evt) => {
console.log('disconnected: ' + evt.code + ': ' + evt.reason);
const ws = new WebSocket(URL + '/ws/');
ws.onopen = this.onOpen;
ws.onmessage = this.onMessage;
ws.onclose = this.onClose;
this.setState({
ws
});
}

onMessage = (evt) => {
evt.data.split('\n').forEach((line) => {
const message = JSON.parse(line);
console.log(message)
});
}
}

0 comments on commit d7a49fe

Please sign in to comment.