Skip to content

Commit

Permalink
Merge pull request #611 from battlecode/jerrym-multi-scaffold
Browse files Browse the repository at this point in the history
Run multiple matches simultaneously per machine
  • Loading branch information
n8kim1 authored Jan 26, 2023
2 parents ab69f10 + 6f91f35 commit 2d0b2a7
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 38 deletions.
6 changes: 4 additions & 2 deletions deploy/galaxy/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ module "saturn_compile" {
artifact_registry_name = var.artifact_registry_name
storage_releases_name = var.storage_releases_name

secret_id = google_secret_manager_secret.saturn.secret_id
secret_id = google_secret_manager_secret.saturn.secret_id
parallelism = 1

network_vpc_id = google_compute_network.this.id
subnetwork_ip_cidr = "172.16.0.0/16"
Expand Down Expand Up @@ -213,7 +214,8 @@ module "saturn_execute" {
artifact_registry_name = var.artifact_registry_name
storage_releases_name = var.storage_releases_name

secret_id = google_secret_manager_secret.saturn.secret_id
secret_id = google_secret_manager_secret.saturn.secret_id
parallelism = 2

network_vpc_id = google_compute_network.this.id
subnetwork_ip_cidr = "172.17.0.0/16"
Expand Down
1 change: 1 addition & 0 deletions deploy/saturn/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ module "container" {
"-project=${var.gcp_project}",
"-secret=${var.secret_id}",
"-subscription=${google_pubsub_subscription.queue.name}",
"-parallel=${var.parallelism}",
]
}
}
Expand Down
5 changes: 5 additions & 0 deletions deploy/saturn/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ variable "secret_id" {
type = string
}

variable "parallelism" {
description = "Number of jobs to run simultaneously per machine"
type = number
}

variable "network_vpc_id" {
description = "ID of Google VPC network resource"
type = string
Expand Down
54 changes: 39 additions & 15 deletions saturn/cmd/saturn/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"fmt"
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"time"

"github.com/battlecode/galaxy/saturn/pkg/run"
Expand All @@ -23,6 +26,7 @@ var (
gcpTokenedReporterUserAgent *string = flag.String("useragent", "Galaxy-Saturn", "the user agent for reporting")
monitorPort *uint = flag.Uint("port", 8005, "the port for monitoring shutdowns")
scaffoldRoot *string = flag.String("scaffold", "/scaffolds", "the root directory for saving scaffolds")
parallelism *uint = flag.Uint("parallel", 1, "the number of scaffolds to run in parallel")
)

func main() {
Expand All @@ -39,26 +43,46 @@ func main() {
log.Ctx(ctx).Fatal().Err(err).Msg("Could not read secrets.")
}

multiplexer, err := run.NewScaffoldMultiplexer(*scaffoldRoot, secret)
monitor, err := saturn.NewMonitor(fmt.Sprintf("127.0.0.1:%d", *monitorPort))
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Msg("Could not initialize scaffold multiplexer.")
log.Ctx(ctx).Fatal().Err(err).Msg("Could not initialize monitor.")
}
go monitor.Start()
defer monitor.Close()
ctx = monitor.WithContext(ctx)

app, err := saturn.New(
ctx,
saturn.WithMonitor(fmt.Sprintf("127.0.0.1:%d", *monitorPort)),
saturn.WithGcpPubsubSubcriber(*gcpProjectID, *gcpPubsubSubscriptionID),
saturn.WithGcpTokenedReporter(*gcpTokenedReporterAudience, *gcpTokenedReporterUserAgent),
saturn.WithRunner("compile", multiplexer.Compile),
saturn.WithRunner("execute", multiplexer.Execute),
var (
i uint
wg sync.WaitGroup
)
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Msg("Could not initialize Saturn.")
}
for i = 0; i < *parallelism; i++ {
root := filepath.Join(*scaffoldRoot, strconv.FormatUint(uint64(i), 10))
multiplexer, err := run.NewScaffoldMultiplexer(root, secret)
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Msg("Could not initialize scaffold multiplexer.")
}

if err := app.Start(ctx); err != nil {
// TODO: log a traceback
log.Ctx(ctx).Fatal().Err(err).Msg("System shut down abnormally.")
app, err := saturn.New(
ctx,
saturn.WithGcpPubsubSubcriber(*gcpProjectID, *gcpPubsubSubscriptionID),
saturn.WithGcpTokenedReporter(*gcpTokenedReporterAudience, *gcpTokenedReporterUserAgent),
saturn.WithRunner("compile", multiplexer.Compile),
saturn.WithRunner("execute", multiplexer.Execute),
)
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Msg("Could not initialize Saturn.")
}

wg.Add(1)
go func() {
defer wg.Done()
if err := app.Start(ctx); err != nil {
// TODO: log a traceback
log.Ctx(ctx).Fatal().Err(err).Msg("System shut down abnormally.")
}
}()
}

wg.Wait()
log.Ctx(ctx).Info().Msg("System shut down normally.")
}
21 changes: 0 additions & 21 deletions saturn/pkg/saturn/saturn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
)

type Saturn struct {
monitor *Monitor
queue QueueClient
report Reporter
runners map[string]Runner
Expand All @@ -28,11 +27,6 @@ func New(ctx context.Context, options ...SaturnOption) (s *Saturn, err error) {
}

func (s *Saturn) Start(ctx context.Context) error {
if s.monitor != nil {
go s.monitor.Start()
defer s.monitor.Close()
ctx = s.monitor.WithContext(ctx)
}
log.Ctx(ctx).Info().Msg("Starting server.")
if err := s.queue.Subscribe(ctx, s.Handle); err != nil {
return fmt.Errorf("queue.Subscribe: %v", err)
Expand All @@ -57,21 +51,6 @@ func (s *Saturn) Handle(ctx context.Context, payload TaskPayload) error {

type SaturnOption func(context.Context, *Saturn) (*Saturn, error)

func WithMonitor(address string) SaturnOption {
return func(ctx context.Context, s *Saturn) (*Saturn, error) {
if s.monitor != nil {
return nil, fmt.Errorf("monitor already exists")
}
log.Ctx(ctx).Debug().Msg("Initializing monitor.")
monitor, err := NewMonitor(address)
if err != nil {
return nil, fmt.Errorf("NewMonitor: %v", err)
}
s.monitor = monitor
return s, nil
}
}

func WithGcpPubsubSubcriber(projectID, subscriptionID string) SaturnOption {
return func(ctx context.Context, s *Saturn) (*Saturn, error) {
if s.queue != nil {
Expand Down

0 comments on commit 2d0b2a7

Please sign in to comment.