Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Add CLI flags for topic suport to the executable/docker image (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maelkum authored Jan 19, 2024
1 parent 8fac9a9 commit 28cdc09
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ worker-pdb
worker-fdb
workspace
runtime.tar.gz
node
cmd/node/node
13 changes: 6 additions & 7 deletions cmd/node/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import (
)

// ExecuteRequest describes the payload for the REST API request for function execution.
type ExecuteRequest execute.Request
type ExecuteRequest struct {
execute.Request
Subgroup string `json:"subgroup,omitempty"`
}

// ExecuteResponse describes the REST API response for function execution.
type ExecuteResponse struct {
Expand Down Expand Up @@ -43,10 +46,9 @@ func createExecutor(a api.API) func(ctx echo.Context) error {
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err))
}
fmt.Println("Executing inference function: ", req.FunctionID)

// Get the execution result.
code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(req))
code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), req.Request, req.Subgroup)
if err != nil {
a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function")
}
Expand Down Expand Up @@ -102,7 +104,7 @@ func createExecutor(a api.API) func(ctx echo.Context) error {
fmt.Println("Executing weight adjusment function: ", calcWeightsReq.FunctionID)

// Get the execution result.
_, _, weightsResults, _, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(calcWeightsReq))
_, _, weightsResults, _, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(calcWeightsReq), "")
if err != nil {
a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function")
}
Expand All @@ -115,6 +117,3 @@ func createExecutor(a api.API) func(ctx echo.Context) error {
return ctx.JSON(http.StatusOK, res)
}
}

// {"eth_price": 2530.5,"inferences":[{"worker":"upt16ar7k93c6razqcuvxdauzdlaz352sfjp2rpj3i","inference":2443}],"latest_weights":{"upt16ar7k93c6razqcuvxdauzdlaz352sfjp2rpj3i": 0.3}}
// {"eth_price": 555, "inferences": [{"worker": "worker1", "inference": 560}, {"worker": "worker2", "inference": 550}], "latest_weights": {"worker1": 0.9, "worker2": 0.8}}
1 change: 1 addition & 0 deletions cmd/node/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func parseFlags() *config.Config {
pflag.StringVar(&cfg.RuntimePath, "runtime-path", "", "runtime path (used by the worker node)")
pflag.StringVar(&cfg.RuntimeCLI, "runtime-cli", "", "runtime path (used by the worker node)")
pflag.BoolVar(&cfg.LoadAttributes, "attributes", false, "node should try to load its attribute data from IPFS")
pflag.StringSliceVar(&cfg.Topics, "topic", nil, "topics node should subscribe to")

// Host configuration.
pflag.StringVar(&cfg.Host.PrivateKey, "private-key", "", "private key that the b7s host will use")
Expand Down
11 changes: 8 additions & 3 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func run() int {
// Create function store.
fstore := fstore.New(log, functionStore, cfg.Workspace)

// If we have topics specified, use those.
if len(cfg.Topics) > 0 {
opts = append(opts, node.WithTopics(cfg.Topics))
}

// Instantiate node.
node, err := node.New(log, host, peerstore, fstore, opts...)
if err != nil {
Expand All @@ -200,17 +205,17 @@ func run() int {

log.Info().
Str("role", role.String()).
Msg("Upshot Node Node starting")
Msg("Upshot Node starting")

err := node.Run(ctx)
if err != nil {
log.Error().Err(err).Msg("Upshot Node Node failed")
log.Error().Err(err).Msg("Upshot Node failed")
close(failed)
} else {
close(done)
}

log.Info().Msg("Upshot Node Node stopped")
log.Info().Msg("Upshot Node stopped")
}()

// If we're a head node - start the REST API.
Expand Down
7 changes: 6 additions & 1 deletion docker/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,13 @@ if [ -n "$BOOT_NODES" ]; then
bootnode_args="--boot-nodes $BOOT_NODES"
fi

topic_args=""
if [ -n "$TOPICS" ]; then
topic_args="--topic $TOPICS"
fi

if [ "$NODE_ROLE" = "head" ]; then
./upshot-node --peer-db /var/tmp/upshot/peerdb --function-db /var/tmp/upshot/function-db --log-level debug --port $P2P_PORT --role head --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH --rest-api :$REST_API $dialback_args $bootnode_args
else
./upshot-node --peer-db ./peer-database --function-db ./function-database--log-level debug --port $P2P_PORT --role worker --runtime-path /app/runtime --runtime-cli bls-runtime --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH $dialback_args $bootnode_args
./upshot-node --peer-db ./peer-database --function-db ./function-database --log-level debug --port $P2P_PORT --role worker --runtime-path /app/runtime --runtime-cli bls-runtime --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH $dialback_args $bootnode_args $topic_args
fi
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/libp2p/go-libp2p v0.32.2
github.com/multiformats/go-multiaddr v0.12.1
github.com/spf13/pflag v1.0.5
github.com/upshot-tech/protocol-state-machine-module main
github.com/upshot-tech/protocol-state-machine-module v0.0.0-20240118215733-7a65a3f6cdb8
)

require (
Expand Down Expand Up @@ -255,7 +255,7 @@ require (
)

require (
github.com/blocklessnetwork/b7s v0.4.5
github.com/blocklessnetwork/b7s v0.4.6-0.20240112135439-7ed12207f6af
github.com/labstack/gommon v0.4.2 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ github.com/bits-and-blooms/bitset v1.8.0 h1:FD+XqgOZDUxxZ8hzoBFuV9+cGWY9CslN6d5M
github.com/bits-and-blooms/bitset v1.8.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/blocklessnetwork/b7s v0.4.5 h1:EtiP4NlkwZgDgQKQGx5PO9XWzJgDGV7Civ/BFemSHxM=
github.com/blocklessnetwork/b7s v0.4.5/go.mod h1:fkA5Te5tqk+foXMS50qe0zSx4V5KaO4PLKKpt2AudQE=
github.com/blocklessnetwork/b7s v0.4.6-0.20240112135439-7ed12207f6af h1:SBlngQZUbvA9b1XTyO8BtQltWS1nULRLREwsbI/ZWHI=
github.com/blocklessnetwork/b7s v0.4.6-0.20240112135439-7ed12207f6af/go.mod h1:Y/Tqev+RbvkYSAF3B+odJ5wPEfGEICR8mRijF5zns0E=
github.com/blocklessnetwork/b7s-attributes v0.0.0 h1:GoJmJpZVZOLjCE52jmTzfQLy2VAWdOuCdcrvct0HyC8=
github.com/blocklessnetwork/b7s-attributes v0.0.0/go.mod h1:0c+ZemB4kfylI14IERH4CSUslZtKcQIuVHk8L4DiLI8=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
Expand Down Expand Up @@ -956,8 +956,8 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/upshot-tech/protocol-state-machine-module v0.0.0-20240117151619-5620d8bf74a6 h1:hqWWg4B1/6FQAltjrxkXgB5ZyVoIy8y9MzS6ODSXpKE=
github.com/upshot-tech/protocol-state-machine-module v0.0.0-20240117151619-5620d8bf74a6/go.mod h1:pPdKzuFaQhdvnyScR3EUxVKkJeCZRdZ3jEUj7lmzRKw=
github.com/upshot-tech/protocol-state-machine-module v0.0.0-20240118215733-7a65a3f6cdb8 h1:2FPiNHCIr1ZeOIzuxAbKmTkkgGEcnLbQeZtPMzWE5qY=
github.com/upshot-tech/protocol-state-machine-module v0.0.0-20240118215733-7a65a3f6cdb8/go.mod h1:pPdKzuFaQhdvnyScR3EUxVKkJeCZRdZ3jEUj7lmzRKw=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
Expand Down

0 comments on commit 28cdc09

Please sign in to comment.