From 50006d3c0988dc3f02095be795bc989b2d94d137 Mon Sep 17 00:00:00 2001 From: Jerry Mao Date: Wed, 25 Jan 2023 21:13:55 -0500 Subject: [PATCH 1/2] Allow multiple scaffolds per machine --- saturn/cmd/saturn/main.go | 54 ++++++++++++++++++++++++++----------- saturn/pkg/saturn/saturn.go | 21 --------------- 2 files changed, 39 insertions(+), 36 deletions(-) diff --git a/saturn/cmd/saturn/main.go b/saturn/cmd/saturn/main.go index c2f01358e..e7523ad3d 100644 --- a/saturn/cmd/saturn/main.go +++ b/saturn/cmd/saturn/main.go @@ -6,6 +6,9 @@ import ( "fmt" "os" "os/signal" + "path/filepath" + "strconv" + "sync" "time" "github.com/battlecode/galaxy/saturn/pkg/run" @@ -23,6 +26,7 @@ var ( gcpTokenedReporterUserAgent *string = flag.String("useragent", "Galaxy-Saturn", "the user agent for reporting") monitorPort *uint = flag.Uint("port", 8005, "the port for monitoring shutdowns") scaffoldRoot *string = flag.String("scaffold", "/scaffolds", "the root directory for saving scaffolds") + parallelism *uint = flag.Uint("parallel", 1, "the number of scaffolds to run in parallel") ) func main() { @@ -39,26 +43,46 @@ func main() { log.Ctx(ctx).Fatal().Err(err).Msg("Could not read secrets.") } - multiplexer, err := run.NewScaffoldMultiplexer(*scaffoldRoot, secret) + monitor, err := saturn.NewMonitor(fmt.Sprintf("127.0.0.1:%d", *monitorPort)) if err != nil { - log.Ctx(ctx).Fatal().Err(err).Msg("Could not initialize scaffold multiplexer.") + log.Ctx(ctx).Fatal().Err(err).Msg("Could not initialize monitor.") } + go monitor.Start() + defer monitor.Close() + ctx = monitor.WithContext(ctx) - app, err := saturn.New( - ctx, - saturn.WithMonitor(fmt.Sprintf("127.0.0.1:%d", *monitorPort)), - saturn.WithGcpPubsubSubcriber(*gcpProjectID, *gcpPubsubSubscriptionID), - saturn.WithGcpTokenedReporter(*gcpTokenedReporterAudience, *gcpTokenedReporterUserAgent), - saturn.WithRunner("compile", multiplexer.Compile), - saturn.WithRunner("execute", multiplexer.Execute), + var ( + i uint + wg sync.WaitGroup ) - if err != nil { - log.Ctx(ctx).Fatal().Err(err).Msg("Could not initialize Saturn.") - } + for i = 0; i < *parallelism; i++ { + root := filepath.Join(*scaffoldRoot, strconv.FormatUint(uint64(i), 10)) + multiplexer, err := run.NewScaffoldMultiplexer(root, secret) + if err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("Could not initialize scaffold multiplexer.") + } - if err := app.Start(ctx); err != nil { - // TODO: log a traceback - log.Ctx(ctx).Fatal().Err(err).Msg("System shut down abnormally.") + app, err := saturn.New( + ctx, + saturn.WithGcpPubsubSubcriber(*gcpProjectID, *gcpPubsubSubscriptionID), + saturn.WithGcpTokenedReporter(*gcpTokenedReporterAudience, *gcpTokenedReporterUserAgent), + saturn.WithRunner("compile", multiplexer.Compile), + saturn.WithRunner("execute", multiplexer.Execute), + ) + if err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("Could not initialize Saturn.") + } + + wg.Add(1) + go func() { + defer wg.Done() + if err := app.Start(ctx); err != nil { + // TODO: log a traceback + log.Ctx(ctx).Fatal().Err(err).Msg("System shut down abnormally.") + } + }() } + + wg.Wait() log.Ctx(ctx).Info().Msg("System shut down normally.") } diff --git a/saturn/pkg/saturn/saturn.go b/saturn/pkg/saturn/saturn.go index 83e295ec9..ce7560242 100644 --- a/saturn/pkg/saturn/saturn.go +++ b/saturn/pkg/saturn/saturn.go @@ -8,7 +8,6 @@ import ( ) type Saturn struct { - monitor *Monitor queue QueueClient report Reporter runners map[string]Runner @@ -28,11 +27,6 @@ func New(ctx context.Context, options ...SaturnOption) (s *Saturn, err error) { } func (s *Saturn) Start(ctx context.Context) error { - if s.monitor != nil { - go s.monitor.Start() - defer s.monitor.Close() - ctx = s.monitor.WithContext(ctx) - } log.Ctx(ctx).Info().Msg("Starting server.") if err := s.queue.Subscribe(ctx, s.Handle); err != nil { return fmt.Errorf("queue.Subscribe: %v", err) @@ -57,21 +51,6 @@ func (s *Saturn) Handle(ctx context.Context, payload TaskPayload) error { type SaturnOption func(context.Context, *Saturn) (*Saturn, error) -func WithMonitor(address string) SaturnOption { - return func(ctx context.Context, s *Saturn) (*Saturn, error) { - if s.monitor != nil { - return nil, fmt.Errorf("monitor already exists") - } - log.Ctx(ctx).Debug().Msg("Initializing monitor.") - monitor, err := NewMonitor(address) - if err != nil { - return nil, fmt.Errorf("NewMonitor: %v", err) - } - s.monitor = monitor - return s, nil - } -} - func WithGcpPubsubSubcriber(projectID, subscriptionID string) SaturnOption { return func(ctx context.Context, s *Saturn) (*Saturn, error) { if s.queue != nil { From 6f91f35908a7fe93836a3f70e156b4f899d16039 Mon Sep 17 00:00:00 2001 From: Jerry Mao Date: Wed, 25 Jan 2023 21:19:08 -0500 Subject: [PATCH 2/2] Configure 2 matches to run per execute machine --- deploy/galaxy/main.tf | 6 ++++-- deploy/saturn/main.tf | 1 + deploy/saturn/variables.tf | 5 +++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/deploy/galaxy/main.tf b/deploy/galaxy/main.tf index 6b92485fb..75b0d7450 100644 --- a/deploy/galaxy/main.tf +++ b/deploy/galaxy/main.tf @@ -184,7 +184,8 @@ module "saturn_compile" { artifact_registry_name = var.artifact_registry_name storage_releases_name = var.storage_releases_name - secret_id = google_secret_manager_secret.saturn.secret_id + secret_id = google_secret_manager_secret.saturn.secret_id + parallelism = 1 network_vpc_id = google_compute_network.this.id subnetwork_ip_cidr = "172.16.0.0/16" @@ -213,7 +214,8 @@ module "saturn_execute" { artifact_registry_name = var.artifact_registry_name storage_releases_name = var.storage_releases_name - secret_id = google_secret_manager_secret.saturn.secret_id + secret_id = google_secret_manager_secret.saturn.secret_id + parallelism = 2 network_vpc_id = google_compute_network.this.id subnetwork_ip_cidr = "172.17.0.0/16" diff --git a/deploy/saturn/main.tf b/deploy/saturn/main.tf index a82ea7f96..f999527c4 100644 --- a/deploy/saturn/main.tf +++ b/deploy/saturn/main.tf @@ -85,6 +85,7 @@ module "container" { "-project=${var.gcp_project}", "-secret=${var.secret_id}", "-subscription=${google_pubsub_subscription.queue.name}", + "-parallel=${var.parallelism}", ] } } diff --git a/deploy/saturn/variables.tf b/deploy/saturn/variables.tf index 443a72b28..6879b2332 100644 --- a/deploy/saturn/variables.tf +++ b/deploy/saturn/variables.tf @@ -53,6 +53,11 @@ variable "secret_id" { type = string } +variable "parallelism" { + description = "Number of jobs to run simultaneously per machine" + type = number +} + variable "network_vpc_id" { description = "ID of Google VPC network resource" type = string