diff --git a/charts/beskar-mirror/values.yaml b/charts/beskar-mirror/values.yaml index a4f628f..a558445 100644 --- a/charts/beskar-mirror/values.yaml +++ b/charts/beskar-mirror/values.yaml @@ -127,4 +127,7 @@ configData: gcs: bucket: beskar-mirror azure: - container: beskar-mirror \ No newline at end of file + container: beskar-mirror + + sync: + max_worker_count: 50 \ No newline at end of file diff --git a/go.mod b/go.mod index 599fb29..afcdb64 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,9 @@ require ( github.com/RussellLuo/kun v0.4.5 github.com/RussellLuo/validating/v3 v3.0.0-beta.1 github.com/adlio/schema v1.3.4 - github.com/antoniomika/go-rsync v0.0.0-20220817021523-f831db35f9a3 github.com/aws/aws-sdk-go v1.48.10 github.com/cavaliergopher/rpm v1.2.0 + github.com/cenkalti/backoff v2.2.1+incompatible github.com/cenkalti/backoff/v4 v4.2.1 github.com/distribution/distribution/v3 v3.0.0-alpha.1 github.com/distribution/reference v0.5.0 @@ -34,6 +34,7 @@ require ( github.com/twmb/murmur3 v1.1.8 github.com/ulikunitz/xz v0.5.11 github.com/vishvananda/netlink v1.2.1-beta.2 + go.ciq.dev/go-rsync v0.0.0-20240304021629-0a3bb196e6d1 go.opentelemetry.io/otel v1.21.0 go.opentelemetry.io/otel/trace v1.21.0 gocloud.dev v0.32.0 diff --git a/go.sum b/go.sum index 711eebe..4134471 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/antoniomika/go-rsync v0.0.0-20220817021523-f831db35f9a3 h1:5mL0NCuUVoX5omCZhvf7yPW1wzgRlo3cWGLDUnN6kkM= -github.com/antoniomika/go-rsync v0.0.0-20220817021523-f831db35f9a3/go.mod h1:rKzRO3ppwfCUpHMf/IEnJuwuGsr6yi0rlG7/RE32oEY= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q= @@ -129,6 +127,7 @@ github.com/bytecodealliance/wasmtime-go/v3 v3.0.2/go.mod h1:RnUjnIXxEJcL6BgCvNyz github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cavaliergopher/rpm v1.2.0 h1:s0h+QeVK252QFTolkhGiMeQ1f+tMeIMhGl8B1HUmGUc= github.com/cavaliergopher/rpm v1.2.0/go.mod h1:R0q3vTqa7RUvPofAZYrnjJ63hh2vngjFfphuXiExVos= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= @@ -690,6 +689,8 @@ github.com/yashtewari/glob-intersection v0.2.0/go.mod h1:LK7pIC3piUjovexikBbJ26Y github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.ciq.dev/go-rsync v0.0.0-20240304021629-0a3bb196e6d1 h1:lYxtzhvoRGnoET/RcKJDnRnmaHuGKBCUIj3D1ZubBNg= +go.ciq.dev/go-rsync v0.0.0-20240304021629-0a3bb196e6d1/go.mod h1:xOHMiPHUTm8AQpxu4n14T8bRuT/izQISy8ycm/Q3LLY= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= diff --git a/internal/plugins/mirror/api.go b/internal/plugins/mirror/api.go index 91f363c..9ec3c8e 100644 --- a/internal/plugins/mirror/api.go +++ b/internal/plugins/mirror/api.go @@ -53,6 +53,13 @@ func (p *Plugin) SyncRepository(ctx context.Context, repository string, wait boo return p.repositoryManager.Get(ctx, repository).SyncRepository(ctx, wait) } +func (p *Plugin) GenerateRepository(ctx context.Context, repository string) (err error) { + if err := checkRepository(repository); err != nil { + return err + } + return p.repositoryManager.Get(ctx, repository).GenerateRepository(ctx) +} + func (p *Plugin) GetRepositorySyncStatus(ctx context.Context, repository string) (syncStatus *apiv1.SyncStatus, err error) { if err := checkRepository(repository); err != nil { return nil, err @@ -60,6 +67,13 @@ func (p *Plugin) GetRepositorySyncStatus(ctx context.Context, repository string) return p.repositoryManager.Get(ctx, repository).GetRepositorySyncStatus(ctx) } +func (p *Plugin) GetRepositorySyncPlan(ctx context.Context, repository string) (syncPlan *apiv1.RepositorySyncPlan, err error) { + if err := checkRepository(repository); err != nil { + return nil, err + } + return p.repositoryManager.Get(ctx, repository).GetRepositorySyncPlan(ctx) +} + func (p *Plugin) ListRepositoryLogs(ctx context.Context, repository string, page *apiv1.Page) (logs []apiv1.RepositoryLog, err error) { if err := checkRepository(repository); err != nil { return nil, err @@ -80,3 +94,17 @@ func (p *Plugin) GetRepositoryFile(ctx context.Context, repository, file string) } return p.repositoryManager.Get(ctx, repository).GetRepositoryFile(ctx, file) } + +func (p *Plugin) GetRepositoryFileCount(ctx context.Context, repository string) (count int, err error) { + if err := checkRepository(repository); err != nil { + return 0, err + } + return p.repositoryManager.Get(ctx, repository).GetRepositoryFileCount(ctx) +} + +func (p *Plugin) DeleteRepositoryFile(ctx context.Context, repository, file string) (err error) { + if err := checkRepository(repository); err != nil { + return err + } + return p.repositoryManager.Get(ctx, repository).DeleteRepositoryFile(ctx, file) +} diff --git a/internal/plugins/mirror/pkg/config/beskar-mirror.go b/internal/plugins/mirror/pkg/config/beskar-mirror.go index 0647d4c..40d8772 100644 --- a/internal/plugins/mirror/pkg/config/beskar-mirror.go +++ b/internal/plugins/mirror/pkg/config/beskar-mirror.go @@ -31,14 +31,15 @@ const ( var defaultBeskarMirrorConfig string type BeskarMirrorConfig struct { - Version string `yaml:"version"` - Log log.Config `yaml:"log"` - Addr string `yaml:"addr"` - Gossip gossip.Config `yaml:"gossip"` - Storage storage.Config `yaml:"storage"` - Profiling bool `yaml:"profiling"` - DataDir string `yaml:"datadir"` - ConfigDirectory string `yaml:"-"` + Version string `yaml:"version"` + Log log.Config `yaml:"log"` + Addr string `yaml:"addr"` + Gossip gossip.Config `yaml:"gossip"` + Storage storage.Config `yaml:"storage"` + Profiling bool `yaml:"profiling"` + DataDir string `yaml:"datadir"` + ConfigDirectory string `yaml:"-"` + Sync config.SyncConfig `yaml:"sync"` } func (bc BeskarMirrorConfig) ListenIP() (string, error) { diff --git a/internal/plugins/mirror/pkg/config/beskar-mirror_test.go b/internal/plugins/mirror/pkg/config/beskar-mirror_test.go index 508e7e3..2088284 100644 --- a/internal/plugins/mirror/pkg/config/beskar-mirror_test.go +++ b/internal/plugins/mirror/pkg/config/beskar-mirror_test.go @@ -47,4 +47,6 @@ func TestParseBeskarMirrorConfig(t *testing.T) { require.Equal(t, "0.0.0.0:5501", bc.Gossip.Addr) require.Equal(t, "XD1IOhcp0HWFgZJ/HAaARqMKJwfMWtz284Yj7wxmerA=", bc.Gossip.Key) require.Equal(t, []string{"127.0.0.1:5102"}, bc.Gossip.Peers) + + require.Equal(t, 50, bc.Sync.MaxWorkerCount) } diff --git a/internal/plugins/mirror/pkg/config/default/beskar-mirror.yaml b/internal/plugins/mirror/pkg/config/default/beskar-mirror.yaml index e0614ad..746d353 100644 --- a/internal/plugins/mirror/pkg/config/default/beskar-mirror.yaml +++ b/internal/plugins/mirror/pkg/config/default/beskar-mirror.yaml @@ -34,4 +34,7 @@ storage: azure: container: beskar-mirror account-name: account_name - account-key: base64_encoded_account_key \ No newline at end of file + account-key: base64_encoded_account_key + +sync: + max_worker_count: 50 \ No newline at end of file diff --git a/internal/plugins/mirror/pkg/index/embedded/index.html.tpl b/internal/plugins/mirror/pkg/index/embedded/index.html.tpl index 4bf7b78..aa99e15 100644 --- a/internal/plugins/mirror/pkg/index/embedded/index.html.tpl +++ b/internal/plugins/mirror/pkg/index/embedded/index.html.tpl @@ -11,7 +11,9 @@ td, th {
../ | +{{- end }} {{- range $dir := .Directories }}
{{ $dir.Name }}/ | diff --git a/internal/plugins/mirror/pkg/mirrordb/repository.go b/internal/plugins/mirror/pkg/mirrordb/repository.go index 6c2b8af..ae76a62 100644 --- a/internal/plugins/mirror/pkg/mirrordb/repository.go +++ b/internal/plugins/mirror/pkg/mirrordb/repository.go @@ -20,10 +20,13 @@ var repositorySchemas embed.FS type RepositoryFile struct { Tag string `db:"tag"` Name string `db:"name"` + Reference string `db:"reference"` + Parent string `db:"parent"` Link string `db:"link"` ModifiedTime int64 `db:"modified_time"` Mode uint32 `db:"mode"` Size uint64 `db:"size"` + ConfigID uint64 `db:"config_id"` } type RepositoryDB struct { @@ -56,15 +59,15 @@ func (db *RepositoryDB) AddFile(ctx context.Context, file *RepositoryFile) error } //nolint:gosec - s := md5.Sum([]byte(file.Name)) + s := md5.Sum([]byte(file.Reference)) file.Tag = hex.EncodeToString(s[:]) db.Lock() result, err := db.NamedExecContext( ctx, // BE CAREFUL and respect the table's columns order !! - "INSERT INTO files VALUES(:tag, :name, :link, :modified_time, :mode, :size) "+ - "ON CONFLICT (tag) DO UPDATE SET name = :name, link = :link, modified_time = :modified_time, mode = :mode, size = :size", + "INSERT INTO files VALUES(:tag, :name, :reference, :parent, :link, :modified_time, :mode, :size, :config_id) "+ + "ON CONFLICT (tag) DO UPDATE SET name = :name, reference = :reference, parent = :parent, link = :link, modified_time = :modified_time, mode = :mode, size = :size, config_id = :config_id", file, ) db.Unlock() @@ -159,6 +162,56 @@ func (db *RepositoryDB) GetFileByName(ctx context.Context, name string) (*Reposi return file, nil } +func (db *RepositoryDB) GetFileByReference(ctx context.Context, reference string) (*RepositoryFile, error) { + db.Reference.Add(1) + defer db.Reference.Add(-1) + + if err := db.Open(ctx); err != nil { + return nil, err + } + + rows, err := db.QueryxContext(ctx, "SELECT * FROM files WHERE reference = ? LIMIT 1", reference) + if err != nil { + return nil, err + } + defer rows.Close() + + file := new(RepositoryFile) + + if !rows.Next() { + return nil, sqlite.ErrNoEntryFound + } + if err := rows.StructScan(file); err != nil { + return nil, err + } + + return file, nil +} + +func (db *RepositoryDB) DeleteFileByName(ctx context.Context, name string) error { + db.Reference.Add(1) + defer db.Reference.Add(-1) + + if err := db.Open(ctx); err != nil { + return err + } + + db.Lock() + result, err := db.ExecContext(ctx, "DELETE FROM files WHERE name = ?", name) + db.Unlock() + + if err != nil { + return err + } + + _, err = result.RowsAffected() + if err != nil { + return err + } + + return nil +} + type WalkFileFunc func(*RepositoryFile) error func (db *RepositoryDB) WalkFiles(ctx context.Context, walkFn WalkFileFunc) error { @@ -223,6 +276,101 @@ func (db *RepositoryDB) WalkSymlinks(ctx context.Context, walkFn WalkFileFunc) e return nil } +func (db *RepositoryDB) WalkFilesByParent(ctx context.Context, parent string, walkFn WalkFileFunc) error { + if walkFn == nil { + return fmt.Errorf("no file walk function provided") + } + + db.Reference.Add(1) + defer db.Reference.Add(-1) + + if err := db.Open(ctx); err != nil { + return err + } + + rows, err := db.QueryxContext(ctx, "SELECT * FROM files WHERE parent = ?", parent) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + file := new(RepositoryFile) + err := rows.StructScan(file) + if err != nil { + return err + } else if err := walkFn(file); err != nil { + return err + } + } + + return nil +} + +func (db *RepositoryDB) WalkFilesByConfigID(ctx context.Context, configID uint64, walkFn WalkFileFunc) error { + if walkFn == nil { + return fmt.Errorf("no file walk function provided") + } + + db.Reference.Add(1) + defer db.Reference.Add(-1) + + if err := db.Open(ctx); err != nil { + return err + } + + rows, err := db.QueryxContext(ctx, "SELECT * FROM files WHERE config_id = ?", configID) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + file := new(RepositoryFile) + err := rows.StructScan(file) + if err != nil { + return err + } else if err := walkFn(file); err != nil { + return err + } + } + + return nil +} + +type WalkStringFunc func(*string) error + +func (db *RepositoryDB) WalkFilesByDistinctParent(ctx context.Context, walkFn WalkStringFunc) error { + if walkFn == nil { + return fmt.Errorf("no file walk function provided") + } + + db.Reference.Add(1) + defer db.Reference.Add(-1) + + if err := db.Open(ctx); err != nil { + return err + } + + rows, err := db.QueryxContext(ctx, "SELECT DISTINCT parent FROM files") + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + parent := new(string) + err := rows.Scan(parent) + if err != nil { + return err + } else if err := walkFn(parent); err != nil { + return err + } + } + + return nil +} + func (db *RepositoryDB) CountFiles(ctx context.Context) (int, error) { db.Reference.Add(1) defer db.Reference.Add(-1) diff --git a/internal/plugins/mirror/pkg/mirrordb/schema/repository/000000_create_files_table.sql b/internal/plugins/mirror/pkg/mirrordb/schema/repository/000000_create_files_table.sql index b41428d..f3ed4ec 100644 --- a/internal/plugins/mirror/pkg/mirrordb/schema/repository/000000_create_files_table.sql +++ b/internal/plugins/mirror/pkg/mirrordb/schema/repository/000000_create_files_table.sql @@ -1,10 +1,16 @@ CREATE TABLE IF NOT EXISTS files ( tag TEXT PRIMARY KEY, name TEXT, + reference TEXT, + parent TEXT, link TEXT, modified_time INTEGER, mode INTEGER, - size INTEGER + size INTEGER, + config_id INTEGER ); -CREATE INDEX filename_idx ON files(name); \ No newline at end of file +CREATE INDEX files_name_idx ON files(name); +CREATE INDEX files_reference_idx ON files(reference); +CREATE INDEX files_parent_idx ON files(parent); +CREATE INDEX files_config_id_idx ON files(config_id); \ No newline at end of file diff --git a/internal/plugins/mirror/pkg/mirrordb/schema/status/000000_create_properties_table.sql b/internal/plugins/mirror/pkg/mirrordb/schema/status/000000_create_properties_table.sql index a5a4f9b..01dad6c 100644 --- a/internal/plugins/mirror/pkg/mirrordb/schema/status/000000_create_properties_table.sql +++ b/internal/plugins/mirror/pkg/mirrordb/schema/status/000000_create_properties_table.sql @@ -2,7 +2,8 @@ CREATE TABLE IF NOT EXISTS properties ( id INTEGER PRIMARY KEY, created BOOLEAN, mirror BOOLEAN, - mirror_urls BLOB + mirror_configs BLOB, + web_config BLOB ); -INSERT INTO properties VALUES(1, false, false, ''); \ No newline at end of file +INSERT INTO properties VALUES(1, false, false, '', ''); \ No newline at end of file diff --git a/internal/plugins/mirror/pkg/mirrordb/status.go b/internal/plugins/mirror/pkg/mirrordb/status.go index ac6bbb0..98ed13e 100644 --- a/internal/plugins/mirror/pkg/mirrordb/status.go +++ b/internal/plugins/mirror/pkg/mirrordb/status.go @@ -24,9 +24,10 @@ type Event struct { } type Properties struct { - Created bool `db:"created"` - Mirror bool `db:"mirror"` - MirrorURLs []byte `db:"mirror_urls"` + Created bool `db:"created"` + Mirror bool `db:"mirror"` + MirrorConfigs []byte `db:"mirror_configs"` + WebConfig []byte `db:"web_config"` } type Sync struct { @@ -200,7 +201,7 @@ func (db *StatusDB) GetProperties(ctx context.Context) (*Properties, error) { return nil, err } - rows, err := db.QueryxContext(ctx, "SELECT created, mirror, mirror_urls FROM properties WHERE id = 1") + rows, err := db.QueryxContext(ctx, "SELECT created, mirror, mirror_configs, web_config FROM properties WHERE id = 1") if err != nil { return nil, err } @@ -229,7 +230,7 @@ func (db *StatusDB) UpdateProperties(ctx context.Context, properties *Properties db.Lock() result, err := db.NamedExecContext( ctx, - "UPDATE properties SET created = :created, mirror = :mirror, mirror_urls = :mirror_urls WHERE id = 1", + "UPDATE properties SET created = :created, mirror = :mirror, mirror_configs = :mirror_configs, web_config = :web_config WHERE id = 1", properties, ) db.Unlock() diff --git a/internal/plugins/mirror/pkg/mirrorrepository/api.go b/internal/plugins/mirror/pkg/mirrorrepository/api.go index 56a035b..c2d61ad 100644 --- a/internal/plugins/mirror/pkg/mirrorrepository/api.go +++ b/internal/plugins/mirror/pkg/mirrorrepository/api.go @@ -50,14 +50,26 @@ func (h *Handler) CreateRepository(ctx context.Context, properties *apiv1.Reposi } h.setMirror(propertiesDB.Mirror) - if properties.MirrorURLs != nil { + if properties.MirrorConfigs != nil { buf := new(bytes.Buffer) decoder := gob.NewEncoder(buf) - if err := decoder.Encode(properties.MirrorURLs); err != nil { + if err := decoder.Encode(properties.MirrorConfigs); err != nil { return werror.Wrap(gcode.ErrInternal, err) } - propertiesDB.MirrorURLs = buf.Bytes() - if err := h.setMirrorURLs(properties.MirrorURLs); err != nil { + propertiesDB.MirrorConfigs = buf.Bytes() + if err := h.setMirrorConfigs(properties.MirrorConfigs); err != nil { + return werror.Wrap(gcode.ErrInternal, err) + } + } + + if properties.WebConfig != nil { + buf := new(bytes.Buffer) + decoder := gob.NewEncoder(buf) + if err := decoder.Encode(properties.WebConfig); err != nil { + return werror.Wrap(gcode.ErrInternal, err) + } + propertiesDB.WebConfig = buf.Bytes() + if err := h.setWebConfig(properties.WebConfig); err != nil { return werror.Wrap(gcode.ErrInternal, err) } } @@ -167,14 +179,26 @@ func (h *Handler) UpdateRepository(ctx context.Context, properties *apiv1.Reposi } h.setMirror(propertiesDB.Mirror) - if properties.MirrorURLs != nil { + if properties.MirrorConfigs != nil { + buf := new(bytes.Buffer) + decoder := gob.NewEncoder(buf) + if err := decoder.Encode(properties.MirrorConfigs); err != nil { + return werror.Wrap(gcode.ErrInternal, err) + } + propertiesDB.MirrorConfigs = buf.Bytes() + if err := h.setMirrorConfigs(properties.MirrorConfigs); err != nil { + return werror.Wrap(gcode.ErrInternal, err) + } + } + + if properties.WebConfig != nil { buf := new(bytes.Buffer) decoder := gob.NewEncoder(buf) - if err := decoder.Encode(properties.MirrorURLs); err != nil { + if err := decoder.Encode(properties.WebConfig); err != nil { return werror.Wrap(gcode.ErrInternal, err) } - propertiesDB.MirrorURLs = buf.Bytes() - if err := h.setMirrorURLs(properties.MirrorURLs); err != nil { + propertiesDB.WebConfig = buf.Bytes() + if err := h.setWebConfig(properties.WebConfig); err != nil { return werror.Wrap(gcode.ErrInternal, err) } } @@ -208,9 +232,16 @@ func (h *Handler) GetRepository(ctx context.Context) (properties *apiv1.Reposito Mirror: &propertiesDB.Mirror, } - if len(propertiesDB.MirrorURLs) > 0 { - decoder := gob.NewDecoder(bytes.NewReader(propertiesDB.MirrorURLs)) - if err := decoder.Decode(&properties.MirrorURLs); err != nil { + if len(propertiesDB.MirrorConfigs) > 0 { + decoder := gob.NewDecoder(bytes.NewReader(propertiesDB.MirrorConfigs)) + if err := decoder.Decode(&properties.MirrorConfigs); err != nil { + return nil, werror.Wrap(gcode.ErrInternal, err) + } + } + + if len(propertiesDB.WebConfig) > 0 { + decoder := gob.NewDecoder(bytes.NewReader(propertiesDB.WebConfig)) + if err := decoder.Decode(&properties.WebConfig); err != nil { return nil, werror.Wrap(gcode.ErrInternal, err) } } @@ -223,8 +254,8 @@ func (h *Handler) SyncRepository(_ context.Context, wait bool) (err error) { return werror.Wrap(gcode.ErrUnavailable, err) } else if !h.getMirror() { return werror.Wrap(gcode.ErrFailedPrecondition, errors.New("repository not setup as a mirror")) - } else if len(h.getMirrorURLs()) == 0 { - return werror.Wrap(gcode.ErrFailedPrecondition, errors.New("repository doesn't have mirror URLs setup")) + } else if len(h.getMirrorConfigs()) == 0 { + return werror.Wrap(gcode.ErrFailedPrecondition, errors.New("repository doesn't have mirror configurations setup")) } else if h.delete.Load() { return werror.Wrap(gcode.ErrAlreadyExists, fmt.Errorf("repository %s is being deleted", h.Repository)) } else if h.syncing.Swap(true) { @@ -251,48 +282,55 @@ func (h *Handler) SyncRepository(_ context.Context, wait bool) (err error) { return nil } -func (h *Handler) SyncRepositoryWithURL(_ context.Context, url string, wait bool) (err error) { +func (h *Handler) GenerateRepository(_ context.Context) (err error) { if !h.Started() { return werror.Wrap(gcode.ErrUnavailable, err) } else if !h.getMirror() { return werror.Wrap(gcode.ErrFailedPrecondition, errors.New("repository not setup as a mirror")) - } else if err := h.setMirrorURLs([]string{url}); err != nil { - return werror.Wrap(gcode.ErrInternal, err) + } else if len(h.getMirrorConfigs()) == 0 { + return werror.Wrap(gcode.ErrFailedPrecondition, errors.New("repository doesn't have mirror configurations setup")) } else if h.delete.Load() { return werror.Wrap(gcode.ErrAlreadyExists, fmt.Errorf("repository %s is being deleted", h.Repository)) - } else if h.syncing.Swap(true) { - return werror.Wrap(gcode.ErrAlreadyExists, errors.New("a repository sync is already running")) + } else if h.syncing.Load() { + return werror.Wrap(gcode.ErrAlreadyExists, errors.New("a repository sync is running")) } - var waitErrCh chan error + if err := h.GenerateIndexes(); err != nil { + h.logger.Error("failed to generate indexes", "error", err) + return werror.Wrap(gcode.ErrUnavailable, errors.New("unable to generate indexes")) + } - if wait { - waitErrCh = make(chan error, 1) + return nil +} + +func (h *Handler) GetRepositorySyncPlan(_ context.Context) (syncPlan *apiv1.RepositorySyncPlan, err error) { + if !h.Started() { + return nil, werror.Wrap(gcode.ErrUnavailable, err) + } else if !h.getMirror() { + return nil, werror.Wrap(gcode.ErrFailedPrecondition, errors.New("repository not setup as a mirror")) + } else if len(h.getMirrorConfigs()) == 0 { + return nil, werror.Wrap(gcode.ErrFailedPrecondition, errors.New("repository doesn't have mirror configurations setup")) + } else if h.delete.Load() { + return nil, werror.Wrap(gcode.ErrAlreadyExists, fmt.Errorf("repository %s is being deleted", h.Repository)) + } else if h.syncing.Load() { + return nil, werror.Wrap(gcode.ErrAlreadyExists, errors.New("a repository sync is running")) } - select { - case h.syncCh <- waitErrCh: - if waitErrCh != nil { - if err := <-waitErrCh; err != nil { - return werror.Wrap(gcode.ErrInternal, fmt.Errorf("synchronization failed: %w", err)) - } - } - default: - return werror.Wrap(gcode.ErrUnavailable, errors.New("something goes wrong")) + plan, err := h.getSyncPlan() + if err != nil { + return nil, werror.Wrap(gcode.ErrUnavailable, errors.New("unable to generate sync plan")) } - return nil + return plan, nil } func (h *Handler) GetRepositorySyncStatus(context.Context) (syncStatus *apiv1.SyncStatus, err error) { sync := h.getSync() return &apiv1.SyncStatus{ - Syncing: sync.Syncing, - StartTime: utils.TimeToString(sync.StartTime), - EndTime: utils.TimeToString(sync.EndTime), - TotalFiles: sync.TotalFiles, - SyncedFiles: sync.SyncedFiles, - SyncError: sync.SyncError, + Syncing: sync.Syncing, + StartTime: utils.TimeToString(sync.StartTime), + EndTime: utils.TimeToString(sync.EndTime), + SyncError: sync.SyncError, }, nil } @@ -366,7 +404,7 @@ func (h *Handler) ListRepositorySymlinks(ctx context.Context, _ *apiv1.Page) (re return repositoryFiles, nil } -func (h *Handler) GetRepositoryFile(ctx context.Context, file string) (repositoryFile *apiv1.RepositoryFile, err error) { +func (h *Handler) GetRepositoryFile(ctx context.Context, name string) (repositoryFile *apiv1.RepositoryFile, err error) { if !h.Started() { return nil, werror.Wrap(gcode.ErrUnavailable, err) } @@ -377,7 +415,7 @@ func (h *Handler) GetRepositoryFile(ctx context.Context, file string) (repositor } defer db.Close(false) - fileDB, err := db.GetFileByName(ctx, file) + fileDB, err := db.GetFileByName(ctx, name) if err != nil { return nil, werror.Wrap(gcode.ErrInternal, err) } @@ -385,6 +423,82 @@ func (h *Handler) GetRepositoryFile(ctx context.Context, file string) (repositor return toRepositoryFileAPI(fileDB), nil } +func (h *Handler) GetRepositoryFileByReferenceRaw(ctx context.Context, reference string) (repositoryFile *mirrordb.RepositoryFile, err error) { + if !h.Started() { + return nil, werror.Wrap(gcode.ErrUnavailable, err) + } + + db, err := h.getRepositoryDB(ctx) + if err != nil { + return nil, werror.Wrap(gcode.ErrInternal, err) + } + defer db.Close(false) + + fileDB, err := db.GetFileByReference(ctx, reference) + if err != nil { + return nil, werror.Wrap(gcode.ErrInternal, err) + } + + return fileDB, nil +} + +func (h *Handler) GetRepositoryFileByReference(ctx context.Context, reference string) (repositoryFile *apiv1.RepositoryFile, err error) { + if !h.Started() { + return nil, werror.Wrap(gcode.ErrUnavailable, err) + } + + db, err := h.getRepositoryDB(ctx) + if err != nil { + return nil, werror.Wrap(gcode.ErrInternal, err) + } + defer db.Close(false) + + fileDB, err := db.GetFileByReference(ctx, reference) + if err != nil { + return nil, werror.Wrap(gcode.ErrInternal, err) + } + + return toRepositoryFileAPI(fileDB), nil +} + +func (h *Handler) GetRepositoryFileCount(ctx context.Context) (count int, err error) { + if !h.Started() { + return 0, werror.Wrap(gcode.ErrUnavailable, err) + } + + db, err := h.getRepositoryDB(ctx) + if err != nil { + return 0, werror.Wrap(gcode.ErrInternal, err) + } + defer db.Close(false) + + count, err = db.CountFiles(ctx) + if err != nil { + return 0, werror.Wrap(gcode.ErrInternal, err) + } + + return count, nil +} + +func (h *Handler) DeleteRepositoryFile(ctx context.Context, file string) (err error) { + if !h.Started() { + return werror.Wrap(gcode.ErrUnavailable, err) + } + + db, err := h.getRepositoryDB(ctx) + if err != nil { + return werror.Wrap(gcode.ErrInternal, err) + } + defer db.Close(false) + + err = db.DeleteFileByName(ctx, file) + if err != nil { + return werror.Wrap(gcode.ErrInternal, err) + } + + return nil +} + func (h *Handler) removeRepositoryFile(ctx context.Context, file *mirrordb.RepositoryFile) error { tagRef := filepath.Join(h.Repository, "files:"+file.Tag) @@ -412,9 +526,12 @@ func toRepositoryFileAPI(file *mirrordb.RepositoryFile) *apiv1.RepositoryFile { return &apiv1.RepositoryFile{ Tag: file.Tag, Name: file.Name, + Reference: file.Reference, + Parent: file.Parent, Link: file.Link, ModifiedTime: utils.TimeToString(file.ModifiedTime), Mode: file.Mode, Size: file.Size, + ConfigID: file.ConfigID, } } diff --git a/internal/plugins/mirror/pkg/mirrorrepository/database.go b/internal/plugins/mirror/pkg/mirrorrepository/database.go index a742ff6..e3684d3 100644 --- a/internal/plugins/mirror/pkg/mirrorrepository/database.go +++ b/internal/plugins/mirror/pkg/mirrorrepository/database.go @@ -96,14 +96,14 @@ func (h *Handler) getRepositoryFile(ctx context.Context, file string) (repositor return fileDB, nil } -func (h *Handler) listRepositoryFiles(ctx context.Context) (repositoryFiles []*mirrordb.RepositoryFile, err error) { +func (h *Handler) listRepositoryFilesByParent(ctx context.Context, parent string) (repositoryFiles []*mirrordb.RepositoryFile, err error) { db, err := h.getRepositoryDB(ctx) if err != nil { return nil, werror.Wrap(gcode.ErrInternal, err) } defer db.Close(false) - err = db.WalkFiles(ctx, func(file *mirrordb.RepositoryFile) error { + err = db.WalkFilesByParent(ctx, parent, func(file *mirrordb.RepositoryFile) error { repositoryFiles = append(repositoryFiles, file) return nil }) @@ -113,3 +113,39 @@ func (h *Handler) listRepositoryFiles(ctx context.Context) (repositoryFiles []*m return repositoryFiles, nil } + +func (h *Handler) listRepositoryFilesByConfigID(ctx context.Context, configID uint64) (repositoryFiles []*mirrordb.RepositoryFile, err error) { + db, err := h.getRepositoryDB(ctx) + if err != nil { + return nil, werror.Wrap(gcode.ErrInternal, err) + } + defer db.Close(false) + + err = db.WalkFilesByConfigID(ctx, configID, func(file *mirrordb.RepositoryFile) error { + repositoryFiles = append(repositoryFiles, file) + return nil + }) + if err != nil { + return nil, werror.Wrap(gcode.ErrInternal, err) + } + + return repositoryFiles, nil +} + +func (h *Handler) listRepositoryDistinctParents(ctx context.Context) (repositoryParents []string, err error) { + db, err := h.getRepositoryDB(ctx) + if err != nil { + return nil, werror.Wrap(gcode.ErrInternal, err) + } + defer db.Close(false) + + err = db.WalkFilesByDistinctParent(ctx, func(parent *string) error { + repositoryParents = append(repositoryParents, *parent) + return nil + }) + if err != nil { + return nil, werror.Wrap(gcode.ErrInternal, err) + } + + return repositoryParents, nil +} diff --git a/internal/plugins/mirror/pkg/mirrorrepository/handler.go b/internal/plugins/mirror/pkg/mirrorrepository/handler.go index ef0777c..6ec44ef 100644 --- a/internal/plugins/mirror/pkg/mirrorrepository/handler.go +++ b/internal/plugins/mirror/pkg/mirrorrepository/handler.go @@ -17,8 +17,20 @@ import ( "go.ciq.dev/beskar/internal/pkg/repository" "go.ciq.dev/beskar/internal/plugins/mirror/pkg/mirrordb" eventv1 "go.ciq.dev/beskar/pkg/api/event/v1" + apiv1 "go.ciq.dev/beskar/pkg/plugins/mirror/api/v1" ) +type mirrorConfig struct { + URL *url.URL + HTTPURL *url.URL + Destination string + Exclusions []string +} + +type webConfig struct { + Prefix string +} + type Handler struct { *repository.RepoHandler @@ -38,7 +50,8 @@ type Handler struct { propertyMutex sync.RWMutex created bool mirror bool - mirrorURLs []*url.URL + mirrorConfigs []mirrorConfig + webConfig *webConfig delete atomic.Bool } @@ -52,7 +65,27 @@ func NewHandler(logger *slog.Logger, repoHandler *repository.RepoHandler) *Handl } } -func (h *Handler) QueueEvent(event *eventv1.EventPayload, _ bool) error { +func (h *Handler) downloadDir() string { + return filepath.Join(h.repoDir, "downloads") +} + +func (h *Handler) QueueEvent(event *eventv1.EventPayload, store bool) error { + ctx := context.Background() + + if store { + db, err := h.getStatusDB(ctx) + if err != nil { + h.logger.Error("status database event", "digest", event.Digest, "mediatype", event.Mediatype, "error", err.Error()) + return err + } else if err := db.AddEvent(ctx, event); err != nil { + h.logger.Error("add event in status database", "digest", event.Digest, "mediatype", event.Mediatype, "error", err.Error()) + return err + } else if err := db.Sync(ctx); err != nil { + h.logger.Error("sync status database", "digest", event.Digest, "mediatype", event.Mediatype, "error", err.Error()) + return err + } + } + h.logger.Info("queued event", "digest", event.Digest) h.EnqueueEvent(event) @@ -167,15 +200,28 @@ func (h *Handler) initProperties(ctx context.Context) error { h.setMirror(properties.Mirror) h.setCreated(properties.Created) - if len(properties.MirrorURLs) > 0 { - var mirrorURLs []string + if len(properties.MirrorConfigs) > 0 { + var mirrorConfigs []apiv1.MirrorConfig - decoder := gob.NewDecoder(bytes.NewReader(properties.MirrorURLs)) - if err := decoder.Decode(&mirrorURLs); err != nil { + decoder := gob.NewDecoder(bytes.NewReader(properties.MirrorConfigs)) + if err := decoder.Decode(&mirrorConfigs); err != nil { return err } - if err := h.setMirrorURLs(mirrorURLs); err != nil { + if err := h.setMirrorConfigs(mirrorConfigs); err != nil { + return err + } + } + + if len(properties.WebConfig) > 0 { + var webConfig apiv1.WebConfig + + decoder := gob.NewDecoder(bytes.NewReader(properties.WebConfig)) + if err := decoder.Decode(&webConfig); err != nil { + return err + } + + if err := h.setWebConfig(&webConfig); err != nil { return err } } @@ -189,30 +235,62 @@ func (h *Handler) initProperties(ctx context.Context) error { return nil } -func (h *Handler) setMirrorURLs(urls []string) error { - var err error - - mirrorURLs := make([]*url.URL, len(urls)) +func (h *Handler) setMirrorConfigs(configs []apiv1.MirrorConfig) error { + mirrorConfigs := make([]mirrorConfig, len(configs)) - for i, u := range urls { - mirrorURLs[i], err = url.Parse(u) + for i, c := range configs { + u, err := url.Parse(c.URL) if err != nil { return err } + + var hu *url.URL + if c.HTTPURL != "" { + hu, err = url.Parse(c.HTTPURL) + if err != nil { + return err + } + } + + mirrorConfigs[i] = mirrorConfig{ + URL: u, + HTTPURL: hu, + Destination: c.Destination, + Exclusions: c.Exclusions, + } + } + + h.propertyMutex.Lock() + h.mirrorConfigs = mirrorConfigs + h.propertyMutex.Unlock() + + return nil +} + +func (h *Handler) getMirrorConfigs() []mirrorConfig { + h.propertyMutex.RLock() + defer h.propertyMutex.RUnlock() + + return h.mirrorConfigs +} + +func (h *Handler) setWebConfig(config *apiv1.WebConfig) error { + webConfig := webConfig{ + Prefix: config.Prefix, } h.propertyMutex.Lock() - h.mirrorURLs = mirrorURLs + h.webConfig = &webConfig h.propertyMutex.Unlock() return nil } -func (h *Handler) getMirrorURLs() []*url.URL { +func (h *Handler) getWebConfig() *webConfig { h.propertyMutex.RLock() defer h.propertyMutex.RUnlock() - return h.mirrorURLs + return h.webConfig } func (h *Handler) setMirror(b bool) { @@ -283,7 +361,7 @@ func (h *Handler) Start(ctx context.Context) { sync := h.getSync() h.propertyMutex.RLock() - mirrorCount := len(h.mirrorURLs) + mirrorCount := len(h.mirrorConfigs) h.propertyMutex.RUnlock() if sync.Syncing && mirrorCount == 0 { diff --git a/internal/plugins/mirror/pkg/mirrorrepository/index_html.go b/internal/plugins/mirror/pkg/mirrorrepository/index_html.go new file mode 100644 index 0000000..8389edc --- /dev/null +++ b/internal/plugins/mirror/pkg/mirrorrepository/index_html.go @@ -0,0 +1,160 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024, CIQ, Inc. All rights reserved +// SPDX-License-Identifier: Apache-2.0 + +package mirrorrepository + +import ( + "bytes" + "context" + "crypto/md5" //nolint:gosec + "encoding/hex" + "path" + "path/filepath" + "sort" + "strings" + "time" + + "go.ciq.dev/beskar/internal/plugins/mirror/pkg/index" + "go.ciq.dev/beskar/internal/plugins/mirror/pkg/mirrordb" + "go.ciq.dev/beskar/pkg/oras" + "go.ciq.dev/beskar/pkg/orasmirror" + "go.ciq.dev/go-rsync/rsync" +) + +const ( + defaultWebPrefix = "/artifacts/mirror/web/v1" +) + +func (h *Handler) GenerateIndexes() error { + webPrefix := path.Join(defaultWebPrefix, strings.TrimPrefix(h.Repository, "artifacts/mirror/")) + if h.getWebConfig() != nil && h.getWebConfig().Prefix != "" { + webPrefix = h.getWebConfig().Prefix + } + + parents, err := h.listRepositoryDistinctParents(context.Background()) + if err != nil { + h.logger.Error("Failed to list distinct parents", "error", err.Error()) + return err + } + + sort.Strings(parents) + + for _, p := range parents { + // Get parentInfo file info + parentInfo, err := h.getRepositoryFile(context.Background(), p) + if err != nil { + h.logger.Error("Failed to get parent info", "error", err.Error(), "parent", p) + return err + } + + // Generate index config for parent + c := index.Config{ + Current: path.Join(webPrefix, filepath.Clean(parentInfo.Name)), + } + + // Don't set previous for root directory + if filepath.Clean(parentInfo.Name) != filepath.Dir(parentInfo.Name) { + c.Previous = path.Join(webPrefix, filepath.Dir(parentInfo.Name)) + } + + files, err := h.listRepositoryFilesByParent(context.Background(), p) + if err != nil { + h.logger.Error("Failed to list files by parent", "error", err.Error(), "parent", p) + return err + } + + // Sort files by name + sort.Slice(files, func(i, j int) bool { + return files[i].Name < files[j].Name + }) + + // Process all files in the parent directory + for _, fileInfo := range files { + // Add files and directories to index config + if rsync.FileMode(fileInfo.Mode).IsDIR() { + if fileInfo.Name == "." { + continue + } + + c.Directories = append(c.Directories, index.Directory{ + Name: filepath.Base(fileInfo.Name), + Ref: path.Join(webPrefix, filepath.Clean(fileInfo.Name)), + MTime: time.Unix(fileInfo.ModifiedTime, 0), + }) + } else if rsync.FileMode(fileInfo.Mode).IsLNK() { + file, err := h.getRepositoryFile(context.Background(), fileInfo.Name) + if err != nil { + h.logger.Error("Failed to get file info", "error", err.Error(), "file", fileInfo.Name) + return err + } + + h.logger.Debug("Processing symlink", "file", fileInfo.Name, "link", file.Link) + targetInfo, err := h.GetRepositoryFileByReferenceRaw(context.Background(), file.Link) + if err != nil { + h.logger.Error("Failed to get target info", "error", err.Error(), "link", file.Link) + return err + } + + if rsync.FileMode(targetInfo.Mode).IsDIR() { + c.Directories = append(c.Directories, index.Directory{ + Name: filepath.Base(file.Name), + Ref: path.Join(webPrefix, strings.TrimPrefix(file.Link, h.Repository)), + MTime: time.Unix(file.ModifiedTime, 0), + }) + } else { + c.Files = append(c.Files, index.File{ + Name: filepath.Base(fileInfo.Name), + Ref: path.Join(webPrefix, filepath.Clean(targetInfo.Name)), + MTime: time.Unix(targetInfo.ModifiedTime, 0), + Size: targetInfo.Size, + }) + } + } else { + c.Files = append(c.Files, index.File{ + Name: filepath.Base(fileInfo.Name), + Ref: path.Join(webPrefix, filepath.Clean(fileInfo.Name)), + MTime: time.Unix(fileInfo.ModifiedTime, 0), + Size: fileInfo.Size, + }) + } + } + + // Generate index.html file + rawIndex, err := index.Generate(c) + if err != nil { + return err + } + + pusher, err := orasmirror.NewStaticFileStreamPusher(bytes.NewReader(rawIndex), "index.html", parentInfo.Reference, h.Params.NameOptions...) + if err != nil { + return err + } + + err = oras.Push(pusher, h.Params.RemoteOptions...) + if err != nil { + return err + } + + //nolint:gosec + s := md5.Sum([]byte(parentInfo.Reference)) + tag := hex.EncodeToString(s[:]) + + err = h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + Tag: tag, + Name: parentInfo.Name, + Reference: parentInfo.Reference, + Parent: parentInfo.Parent, + ModifiedTime: parentInfo.ModifiedTime, + Mode: parentInfo.Mode, + Size: parentInfo.Size, + ConfigID: parentInfo.ConfigID, + }) + if err != nil { + return err + } + + h.logger.Debug("Generated Index", "Name", parentInfo.Name, "Reference", parentInfo.Reference) + } + + return nil +} diff --git a/internal/plugins/mirror/pkg/mirrorrepository/plansync.go b/internal/plugins/mirror/pkg/mirrorrepository/plansync.go new file mode 100644 index 0000000..10e209d --- /dev/null +++ b/internal/plugins/mirror/pkg/mirrorrepository/plansync.go @@ -0,0 +1,258 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024, CIQ, Inc. All rights reserved +// SPDX-License-Identifier: Apache-2.0 + +package mirrorrepository + +import ( + "context" + "crypto/md5" //nolint:gosec + "encoding/hex" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/cenkalti/backoff" + "go.ciq.dev/beskar/internal/plugins/mirror/pkg/mirrordb" + "go.ciq.dev/beskar/pkg/oras" + "go.ciq.dev/beskar/pkg/orasmirror" + "go.ciq.dev/go-rsync/rsync" +) + +type PlanSyncer struct { + h *Handler + config mirrorConfig + configID uint64 + parallelism int + + plan *rsync.SyncPlan +} + +func NewPlanSyncer(h *Handler, config mirrorConfig, configID uint64, parallelism int, plan *rsync.SyncPlan) *PlanSyncer { + return &PlanSyncer{ + h: h, + config: config, + configID: configID, + parallelism: parallelism, + plan: plan, + } +} + +func (s *PlanSyncer) filePush(remoteFile rsync.FileInfo) error { + fileReference := filepath.Clean(s.h.generateFileReference(strings.ToLower(string(remoteFile.Path)))) + + // Fetch file from remote + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, s.config.HTTPURL.String()+"/"+string(remoteFile.Path), nil) + if err != nil { + s.h.logger.Error("Failed to create request", "file", string(remoteFile.Path), "error", err) + return err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + s.h.logger.Error("Failed to fetch file", "file", string(remoteFile.Path), "error", err) + return err + } + + f, err := os.CreateTemp(s.h.downloadDir(), "") + if err != nil { + s.h.logger.Error("Failed to create temp file", "file", string(remoteFile.Path), "error", err) + resp.Body.Close() + return err + } + defer os.Remove(f.Name()) + + s.h.logger.Debug("Downloading", "file", string(remoteFile.Path), "temp", f.Name()) + _, err = io.Copy(f, resp.Body) + if err != nil { + s.h.logger.Error("Failed to download file", "file", string(remoteFile.Path), "error", err) + resp.Body.Close() + return err + } + resp.Body.Close() + + // Commit content to storage + if err := f.Sync(); err != nil { + return err + } + + cb := backoff.WithMaxRetries( + backoff.NewConstantBackOff(5*time.Second), + 3, + ) + err = backoff.Retry(func() error { + // Seek to start of file + if _, err := f.Seek(0, 0); err != nil { + return err + } + + // Push file to storage + repoPath := filepath.Join(s.h.Repository, filepath.Dir(string(remoteFile.Path))) + s.h.logger.Debug("Pushing", "file", string(remoteFile.Path), "repo", repoPath) + pusher, err := orasmirror.NewStaticFileStreamPusher(f, strings.ToLower(filepath.Base(string(remoteFile.Path))), strings.ToLower(repoPath), s.h.Params.NameOptions...) + if err != nil { + s.h.logger.Error("Failed to create pusher", "file", string(remoteFile.Path), "error", err) + return err + } + + err = oras.Push(pusher, s.h.Params.RemoteOptions...) + if err != nil { + s.h.logger.Error("Failed to push file", "file", string(remoteFile.Path), "error", err) + return err + } + + return nil + }, cb) + if err != nil { + return err + } + + // Add entry to DB + //nolint:gosec + sum := md5.Sum([]byte(fileReference)) + tag := hex.EncodeToString(sum[:]) + + name := filepath.Clean(string(remoteFile.Path)) + + err = s.h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + Tag: tag, + Name: name, + Reference: fileReference, + Parent: filepath.Dir(name), + Link: "", + ModifiedTime: int64(remoteFile.Mtime), + Mode: uint32(remoteFile.Mode), + Size: uint64(remoteFile.Size), + ConfigID: s.configID, + }) + if err != nil { + s.h.logger.Error("Failed to add file to repository database", "file", string(remoteFile.Path), "error", err) + return err + } + + return nil +} + +func (s *PlanSyncer) fileWorker(c chan rsync.FileInfo, wg *sync.WaitGroup) { + wg.Add(1) + defer wg.Done() + + for remoteFile := range c { + if err := s.filePush(remoteFile); err != nil { + s.h.logger.Error("Failed to push file", "file", string(remoteFile.Path), "error", err) + } + } +} + +func (s *PlanSyncer) Sync() error { + // Create push channel and wait group + pushChan := make(chan rsync.FileInfo) + wg := new(sync.WaitGroup) + + // Ensure download directory exists + if err := os.MkdirAll(s.h.downloadDir(), 0o755); err != nil { + return err + } + + // Start worker pool + for i := 0; i < s.parallelism; i++ { + go s.fileWorker(pushChan, wg) + } + + // Fetch/Update remote files + for _, i := range s.plan.AddRemoteFiles { + // Get file from remote file list + remoteFile := s.plan.RemoteFiles[i] + + s.h.logger.Debug("Processing", "file", string(remoteFile.Path)) + + fileReference := filepath.Clean(s.h.generateFileReference(strings.ToLower(string(remoteFile.Path)))) + + if remoteFile.Mode.IsREG() { + // Process file in worker pool + pushChan <- remoteFile + } else if remoteFile.Mode.IsDIR() { + // Add entry to DB + //nolint:gosec + sum := md5.Sum([]byte(fileReference)) + tag := hex.EncodeToString(sum[:]) + + name := filepath.Clean(string(remoteFile.Path)) + + err := s.h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + Tag: tag, + Name: name, + Reference: fileReference, + Parent: filepath.Dir(name), + Link: "", + ModifiedTime: int64(remoteFile.Mtime), + Mode: uint32(remoteFile.Mode), + Size: uint64(remoteFile.Size), + ConfigID: s.configID, + }) + if err != nil { + return err + } + } else if remoteFile.Mode.IsLNK() { + // Lookup link content in map + linkContent, ok := s.plan.Symlinks[string(remoteFile.Path)] + if !ok { + return fmt.Errorf("link content not found for %s", remoteFile.Path) + } + + s.h.logger.Debug("Processing Link", "content", string(linkContent)) + + intermediate := filepath.Clean(s.h.generateFileReference(strings.ToLower(string(linkContent)))) + target := strings.TrimPrefix(intermediate, s.h.Repository) + link := filepath.Join(filepath.Dir(fileReference), target) + + name := filepath.Clean(string(remoteFile.Path)) + + // Add entry to DB + //nolint:gosec + sum := md5.Sum([]byte(name)) + tag := hex.EncodeToString(sum[:]) + + err := s.h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + Tag: tag, + Name: name, + Reference: name, + Parent: filepath.Dir(name), + Link: link, + ModifiedTime: int64(remoteFile.Mtime), + Mode: uint32(remoteFile.Mode), + Size: uint64(remoteFile.Size), + ConfigID: s.configID, + }) + if err != nil { + return err + } + } + } + + close(pushChan) + + // Wait for all files to be processed + wg.Wait() + + // Remove local files + for _, i := range s.plan.DeleteLocalFiles { + localFile := s.plan.LocalFiles[i] + s.h.logger.Debug("Removing", "file", string(localFile.Path)) + + fileReference := filepath.Clean(s.h.generateFileReference(strings.ToLower(string(localFile.Path)))) + + // Remove entry from DB + err := s.h.removeFileFromRepositoryDatabase(context.Background(), fileReference) + if err != nil { + s.h.logger.Error("Failed to remove file from repository database", "file", string(localFile.Path), "error", err) + return err + } + } + + return nil +} diff --git a/internal/plugins/mirror/pkg/mirrorrepository/rsyncstorage.go b/internal/plugins/mirror/pkg/mirrorrepository/rsyncstorage.go index 48eddff..2c22534 100644 --- a/internal/plugins/mirror/pkg/mirrorrepository/rsyncstorage.go +++ b/internal/plugins/mirror/pkg/mirrorrepository/rsyncstorage.go @@ -4,48 +4,112 @@ package mirrorrepository import ( - "bytes" "context" "crypto/md5" //nolint:gosec "encoding/hex" - "fmt" "io" + "os" "path/filepath" "sort" "strings" - "time" - "github.com/antoniomika/go-rsync/rsync" - "go.ciq.dev/beskar/internal/plugins/mirror/pkg/index" "go.ciq.dev/beskar/internal/plugins/mirror/pkg/mirrordb" "go.ciq.dev/beskar/pkg/oras" "go.ciq.dev/beskar/pkg/orasmirror" + "go.ciq.dev/go-rsync/rsync" ) -func (h *Handler) Put(filePath string, content io.Reader, fileSize int64, metadata rsync.FileMetadata) (written int64, err error) { - repoPath := h.Repository +type Storage struct { + h *Handler + config mirrorConfig + configID uint64 + pushChan chan pushMessage +} + +type pushMessage struct { + filePath string + repoPath string +} + +func NewStorage(h *Handler, config mirrorConfig, configID uint64) *Storage { + pushChan := make(chan pushMessage) + + for i := 0; i < h.Params.Sync.MaxWorkerCount; i++ { + go func() { + for msg := range pushChan { + repoPath := h.Repository + fileName := msg.filePath + + dir, file := filepath.Split(fileName) + if dir != "" { + repoPath = h.Repository + "/" + filepath.Clean(dir) + fileName = file + } + + f, err := os.Open(msg.filePath) + if err != nil { + h.logger.Error("Failed to open file", "error", err) + continue + } + + h.logger.Debug("Pushing", "file", fileName, "repo", repoPath) + pusher, err := orasmirror.NewStaticFileStreamPusher(f, strings.ToLower(fileName), strings.ToLower(msg.repoPath), h.Params.NameOptions...) + if err != nil { + h.logger.Error("Failed to create pusher", "error", err) + f.Close() + continue + } + + err = oras.Push(pusher, h.Params.RemoteOptions...) + if err != nil { + f.Close() + h.logger.Error("Failed to push file", "error", err) + } + + f.Close() + os.Remove(msg.filePath) + } + }() + } + + return &Storage{ + h: h, + config: config, + configID: configID, + pushChan: pushChan, + } +} + +func (s *Storage) Put(filePath string, content io.Reader, fileSize int64, metadata rsync.FileMetadata) (written int64, err error) { + repoPath := s.h.Repository fileName := filePath // Only handle regular file content if metadata.Mode.IsREG() { - dir, file := filepath.Split(fileName) + dir, _ := filepath.Split(fileName) if dir != "" { - repoPath = h.Repository + "/" + filepath.Clean(dir) - fileName = file + repoPath = filepath.Join(s.h.Repository, dir) } - pusher, err := orasmirror.NewStaticFileStreamPusher(content, strings.ToLower(fileName), strings.ToLower(repoPath), h.Params.NameOptions...) + err := os.MkdirAll(filepath.Dir(filepath.Join(s.h.downloadDir(), filePath)), 0o755) if err != nil { return 0, err } - err = oras.Push(pusher, h.Params.RemoteOptions...) + err = copyTo(content, filepath.Join(s.h.downloadDir(), filePath)) if err != nil { return 0, err } + + s.pushChan <- pushMessage{ + filePath: filepath.Join(s.h.downloadDir(), filePath), + repoPath: repoPath, + } } - fileReference := filepath.Clean(h.generateFileReference(strings.ToLower(filePath))) + fileReference := filepath.Clean(s.h.generateFileReference(strings.ToLower(filePath))) + + name := filepath.Clean(filePath) var link string if metadata.Mode.IsLNK() { @@ -54,22 +118,28 @@ func (h *Handler) Put(filePath string, content io.Reader, fileSize int64, metada return 0, err } - intermediate := filepath.Clean(h.generateFileReference(strings.ToLower(string(c)))) - target := strings.TrimPrefix(intermediate, h.Repository) + intermediate := filepath.Clean(s.h.generateFileReference(strings.ToLower(string(c)))) + target := strings.TrimPrefix(intermediate, s.h.Repository) link = filepath.Join(filepath.Dir(fileReference), target) + + // Set reference on links to something unique, but not used + fileReference = name } //nolint:gosec - s := md5.Sum([]byte(fileReference)) - tag := hex.EncodeToString(s[:]) + sum := md5.Sum([]byte(fileReference)) + tag := hex.EncodeToString(sum[:]) - err = h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + err = s.h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ Tag: tag, - Name: fileReference, + Name: name, + Reference: fileReference, + Parent: filepath.Dir(name), Link: link, ModifiedTime: int64(metadata.Mtime), Mode: uint32(metadata.Mode), Size: uint64(fileSize), + ConfigID: s.configID, }) if err != nil { return 0, err @@ -78,10 +148,10 @@ func (h *Handler) Put(filePath string, content io.Reader, fileSize int64, metada return fileSize, nil } -func (h *Handler) Delete(filePath string, _ rsync.FileMode) error { - fileReference := h.generateFileReference(filePath) +func (s *Storage) Delete(filePath string, _ rsync.FileMode) error { + fileReference := s.h.generateFileReference(filePath) - err := h.removeFileFromRepositoryDatabase(context.Background(), fileReference) + err := s.h.removeFileFromRepositoryDatabase(context.Background(), fileReference) if err != nil { return err } @@ -89,15 +159,15 @@ func (h *Handler) Delete(filePath string, _ rsync.FileMode) error { return nil } -func (h *Handler) List() (rsync.FileList, error) { - repositoryFiles, err := h.listRepositoryFiles(context.Background()) +func (s *Storage) List() (rsync.FileList, error) { + repositoryFiles, err := s.h.listRepositoryFilesByConfigID(context.Background(), s.configID) if err != nil { return nil, err } var fileList rsync.FileList for _, repositoryFile := range repositoryFiles { - path := filepath.Clean(strings.TrimPrefix(repositoryFile.Name, h.Repository)) + path := filepath.Clean(repositoryFile.Name) path = filepath.Clean(strings.TrimPrefix(path, "/")) fileList = append(fileList, rsync.FileInfo{ @@ -113,102 +183,6 @@ func (h *Handler) List() (rsync.FileList, error) { return fileList, nil } -func (h *Handler) GenerateIndexes(remoteList rsync.FileList) error { - indexes := map[string][]rsync.FileInfo{} - directoryMap := map[string]rsync.FileInfo{} - orderedDirectories := []string{} - - for _, f := range remoteList { - if f.Mode.IsDIR() { - directoryMap[filepath.Clean(string(f.Path))] = f - } - - indexes[filepath.Dir(string(f.Path))] = append(indexes[filepath.Dir(string(f.Path))], f) - } - - for d := range indexes { - orderedDirectories = append(orderedDirectories, d) - } - - sort.Strings(orderedDirectories) - - for _, dir := range orderedDirectories { - dirInfo := directoryMap[dir] - fis := indexes[dir] - - c := index.Config{ - Current: filepath.Clean(fmt.Sprintf("/artifacts/mirror/web/v1/%s/%s", strings.TrimPrefix(h.Repository, "artifacts/mirror/"), filepath.Clean(dir))), - Previous: filepath.Clean(fmt.Sprintf("/artifacts/mirror/web/v1/%s/%s", strings.TrimPrefix(h.Repository, "artifacts/mirror/"), filepath.Dir(dir))), - } - for _, fi := range fis { - if fi.Mode.IsDIR() { - if string(fi.Path) == "." { - continue - } - - c.Directories = append(c.Directories, index.Directory{ - Name: filepath.Base(string(fi.Path)), - Ref: fmt.Sprintf("/artifacts/mirror/web/v1/%s/%s", strings.TrimPrefix(h.Repository, "artifacts/mirror/"), filepath.Clean(string(fi.Path))), - MTime: time.Unix(int64(fi.Mtime), 0), - }) - } else if fi.Mode.IsLNK() { - fileReference := filepath.Clean(h.generateFileReference(strings.ToLower(string(fi.Path)))) - file, err := h.getRepositoryFile(context.Background(), fileReference) - if err != nil { - return err - } - - // NOTE: Assume all symlinks are to directories for now. - c.Directories = append(c.Directories, index.Directory{ - Name: filepath.Base(string(fi.Path)), - Ref: fmt.Sprintf("/artifacts/mirror/web/v1/%s", strings.TrimPrefix(filepath.Clean(file.Link), "artifacts/mirror/")), - MTime: time.Unix(int64(fi.Mtime), 0), - }) - } else { - dir, file := filepath.Split(string(fi.Path)) - ref := fmt.Sprintf("/%s/%s/file/%s", h.Repository, filepath.Clean(dir), file) - - c.Files = append(c.Files, index.File{ - Name: filepath.Base(string(fi.Path)), - Ref: ref, - MTime: time.Unix(int64(fi.Mtime), 0), - Size: uint64(fi.Size), - }) - } - } - - rawIndex, err := index.Generate(c) - if err != nil { - return err - } - - dir = filepath.Join(h.Repository, dir) - - pusher, err := orasmirror.NewStaticFileStreamPusher(bytes.NewReader(rawIndex), "index.html", strings.ToLower(dir), h.Params.NameOptions...) - if err != nil { - return err - } - - err = oras.Push(pusher, h.Params.RemoteOptions...) - if err != nil { - return err - } - - //nolint:gosec - s := md5.Sum([]byte(dir)) - tag := hex.EncodeToString(s[:]) - - err = h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ - Tag: tag, - Name: dir, - ModifiedTime: int64(dirInfo.Mtime), - Mode: uint32(dirInfo.Mode), - Size: uint64(dirInfo.Size), - }) - if err != nil { - return err - } - } - - return nil +func (s *Storage) Close() { + close(s.pushChan) } diff --git a/internal/plugins/mirror/pkg/mirrorrepository/sync.go b/internal/plugins/mirror/pkg/mirrorrepository/sync.go index c5081c6..277891f 100644 --- a/internal/plugins/mirror/pkg/mirrorrepository/sync.go +++ b/internal/plugins/mirror/pkg/mirrorrepository/sync.go @@ -5,16 +5,23 @@ package mirrorrepository import ( "context" + "io" + "os" - "github.com/antoniomika/go-rsync/rsync" + apiv1 "go.ciq.dev/beskar/pkg/plugins/mirror/api/v1" + "go.ciq.dev/go-rsync/rsync" ) func (h *Handler) repositorySync(_ context.Context) (errFn error) { sync := h.updateSyncing(true) defer func() { + h.logger.Debug("sync artifact reset") + h.SyncArtifactReset() + h.logger.Debug("update syncing") + sync = h.updateSyncing(false) if errFn != nil { @@ -22,6 +29,8 @@ func (h *Handler) repositorySync(_ context.Context) (errFn error) { } else { sync.SyncError = "" } + + h.logger.Debug("update sync database") if err := h.updateSyncDatabase(dbCtx, sync); err != nil { if errFn == nil { errFn = err @@ -29,36 +38,136 @@ func (h *Handler) repositorySync(_ context.Context) (errFn error) { h.logger.Error("sync database update failed", "error", err.Error()) } } + + h.logger.Debug("defer work done") }() if err := h.updateSyncDatabase(dbCtx, sync); err != nil { return err } - addr, module, path, err := rsync.SplitURI(h.mirrorURLs[0].String()) - if err != nil { - return err + for i, config := range h.mirrorConfigs { + addr, module, path, err := rsync.SplitURL(config.URL) + if err != nil { + return err + } + + cOpts := []rsync.ClientOption{rsync.WithLogger(h.logger)} + if len(config.Exclusions) > 0 { + cOpts = append(cOpts, rsync.WithExclusionList(config.Exclusions)) + } + + if config.URL.User != nil { + password, _ := config.URL.User.Password() + cOpts = append(cOpts, rsync.WithClientAuth(config.URL.User.Username(), password)) + } + + s := NewStorage(h, config, uint64(i)) + + ppath := rsync.TrimPrepath(path) + client, err := rsync.SocketClient(s, addr, module, ppath, cOpts...) + if err != nil { + s.Close() + return err + } + + if config.HTTPURL != nil { + sp, err := client.GetSyncPlan() + if err != nil { + s.Close() + return err + } + + ps := NewPlanSyncer(h, config, uint64(i), h.Params.Sync.MaxWorkerCount, sp) + + if err := ps.Sync(); err != nil { + s.Close() + return err + } + } else { + if err := client.Sync(); err != nil { + s.Close() + return err + } + } + + s.Close() } - ppath := rsync.TrimPrepath(path) - client, err := rsync.SocketClient(h, addr, module, ppath, nil) + h.logger.Debug("generating index.html files") + err := h.GenerateIndexes() if err != nil { return err } - if err := client.Sync(); err != nil { - return err - } + h.logger.Debug("index.html files generated") + + return nil +} - fileList, err := h.List() +func copyTo(src io.Reader, dest string) error { + pkg, err := os.Create(dest) if err != nil { return err } - err = h.GenerateIndexes(fileList) + _, err = io.Copy(pkg, src) + closeErr := pkg.Close() if err != nil { return err + } else if closeErr != nil { + return closeErr } return nil } + +func (h *Handler) getSyncPlan() (*apiv1.RepositorySyncPlan, error) { + plan := &apiv1.RepositorySyncPlan{ + Add: []string{}, + Remove: []string{}, + } + + for i, config := range h.mirrorConfigs { + addr, module, path, err := rsync.SplitURL(config.URL) + if err != nil { + return nil, err + } + + cOpts := []rsync.ClientOption{rsync.WithLogger(h.logger)} + if len(config.Exclusions) > 0 { + cOpts = append(cOpts, rsync.WithExclusionList(config.Exclusions)) + } + + if config.URL.User != nil { + password, _ := config.URL.User.Password() + cOpts = append(cOpts, rsync.WithClientAuth(config.URL.User.Username(), password)) + } + + s := NewStorage(h, config, uint64(i)) + defer s.Close() + + ppath := rsync.TrimPrepath(path) + client, err := rsync.SocketClient(s, addr, module, ppath, cOpts...) + if err != nil { + s.Close() + return nil, err + } + + sp, err := client.GetSyncPlan() + if err != nil { + s.Close() + return nil, err + } + + for _, f := range sp.AddRemoteFiles { + plan.Add = append(plan.Add, string(sp.RemoteFiles[f].Path)) + } + + for _, f := range sp.DeleteLocalFiles { + plan.Remove = append(plan.Remove, string(sp.LocalFiles[f].Path)) + } + } + + return plan, nil +} diff --git a/internal/plugins/mirror/plugin.go b/internal/plugins/mirror/plugin.go index e4b7c7c..346e99c 100644 --- a/internal/plugins/mirror/plugin.go +++ b/internal/plugins/mirror/plugin.go @@ -62,7 +62,8 @@ func New(ctx context.Context, beskarMirrorConfig *config.BeskarMirrorConfig) (*P plugin := &Plugin{ ctx: ctx, handlerParams: &repository.HandlerParams{ - Dir: filepath.Join(beskarMirrorConfig.DataDir, "_repohandlers_"), + Dir: filepath.Join(beskarMirrorConfig.DataDir, "_repohandlers_"), + Sync: beskarMirrorConfig.Sync, }, } plugin.repositoryManager = repository.NewManager[*mirrorrepository.Handler]( diff --git a/internal/plugins/mirror/web.go b/internal/plugins/mirror/web.go index 9c6a070..1ac4a81 100644 --- a/internal/plugins/mirror/web.go +++ b/internal/plugins/mirror/web.go @@ -9,10 +9,10 @@ import ( "path/filepath" "strings" - "github.com/antoniomika/go-rsync/rsync" "go.ciq.dev/beskar/pkg/oras" "go.ciq.dev/beskar/pkg/orasmirror" apiv1 "go.ciq.dev/beskar/pkg/plugins/mirror/api/v1" + "go.ciq.dev/go-rsync/rsync" ) func (p *Plugin) resolveSymlinks(repository, fileName string) (*apiv1.RepositoryFile, error) { @@ -30,8 +30,8 @@ func (p *Plugin) resolveSymlinks(repository, fileName string) (*apiv1.Repository // Check if file has a symlink in its path and replace it var replacement string for _, symlink := range symlinks { - if strings.HasPrefix(intermediate, symlink.Name) { - replacement = strings.Replace(intermediate, symlink.Name, symlink.Link, 1) + if strings.HasPrefix(intermediate, symlink.Reference) { + replacement = strings.Replace(intermediate, symlink.Reference, symlink.Link, 1) break } } @@ -39,7 +39,7 @@ func (p *Plugin) resolveSymlinks(repository, fileName string) (*apiv1.Repository return nil, fmt.Errorf("not found") } - repositoryFile, err := p.repositoryManager.Get(p.ctx, repository).GetRepositoryFile(p.ctx, replacement) + repositoryFile, err := p.repositoryManager.Get(p.ctx, repository).GetRepositoryFileByReference(p.ctx, replacement) if err != nil && !strings.Contains(err.Error(), "no entry found") { return nil, err } else if err == nil { @@ -56,8 +56,13 @@ func (p *Plugin) WebHandler(w http.ResponseWriter, r *http.Request) { subPath := strings.TrimPrefix(r.URL.Path, "/artifacts/mirror/web/v1/") repositoryName := strings.SplitN(subPath, "/", 2)[0] + + fileName := "." + if len(strings.SplitN(subPath, "/", 2)) > 1 { + fileName = filepath.Clean(strings.SplitN(subPath, "/", 2)[1]) + } + repository := fmt.Sprintf("artifacts/mirror/%s", repositoryName) - fileName := filepath.Join("artifacts/mirror", subPath) if err := checkRepository(repository); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -81,15 +86,26 @@ func (p *Plugin) WebHandler(w http.ResponseWriter, r *http.Request) { // Redirect to file api to fetch blob signed url if rsync.FileMode(repositoryFile.Mode).IsREG() { - repo, file := filepath.Split(repositoryFile.Name) + repo, file := filepath.Split(repositoryFile.Reference) http.Redirect(w, r, fmt.Sprintf("/%s/file/%s", repo, file), http.StatusMovedPermanently) return } else if rsync.FileMode(repositoryFile.Mode).IsLNK() { - repositoryFile.Name = repositoryFile.Link + file, err := p.repositoryManager.Get(p.ctx, repository).GetRepositoryFileByReferenceRaw(p.ctx, repositoryFile.Link) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if !rsync.FileMode(file.Mode).IsDIR() { + repo, file := filepath.Split(file.Reference) + http.Redirect(w, r, fmt.Sprintf("/%s/file/%s", repo, file), http.StatusMovedPermanently) + } + + repositoryFile.Reference = repositoryFile.Link } // Fetch index.html for directory listing - ref, err := orasmirror.FileReference("index.html", strings.ToLower(repositoryFile.Name), p.handlerParams.NameOptions...) + ref, err := orasmirror.FileReference("index.html", strings.ToLower(repositoryFile.Reference), p.handlerParams.NameOptions...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/pkg/plugins/mirror/api/v1/api.go b/pkg/plugins/mirror/api/v1/api.go index 11bfb4d..743c1a2 100644 --- a/pkg/plugins/mirror/api/v1/api.go +++ b/pkg/plugins/mirror/api/v1/api.go @@ -24,12 +24,33 @@ type Page struct { Token string } +// Mirror content configurations. +type MirrorConfig struct { + URL string `json:"url"` + HTTPURL string `json:"http_url"` + Destination string `json:"destination"` + Exclusions []string `json:"exclusions"` +} + +// Web content configuration. +type WebConfig struct { + Prefix string `json:"prefix"` +} + // Repository properties/configuration. type RepositoryProperties struct { // Configure the repository as a mirror. Mirror *bool `json:"mirror,omitempty"` - // Mirror/Upstream URLs for mirroring. - MirrorURLs []string `json:"mirror_urls,omitempty"` + // Mirror content configurations. + MirrorConfigs []MirrorConfig `json:"mirror_configs,omitempty"` + // Web content configuration. + WebConfig *WebConfig `json:"web_config,omitempty"` +} + +// Repository synchronization plan. +type RepositorySyncPlan struct { + Add []string `json:"add"` + Remove []string `json:"remove"` } // Repository logs. @@ -43,20 +64,21 @@ type RepositoryLog struct { type RepositoryFile struct { Tag string `json:"tag"` Name string `json:"name"` + Reference string `json:"reference"` + Parent string `json:"parent"` Link string `json:"link"` ModifiedTime string `json:"modified_time"` Mode uint32 `json:"mode"` Size uint64 `json:"size"` + ConfigID uint64 `json:"config_id"` } // Mirror sync status. type SyncStatus struct { - Syncing bool `json:"syncing"` - StartTime string `json:"start_time"` - EndTime string `json:"end_time"` - TotalFiles int `json:"total_files"` - SyncedFiles int `json:"synced_files"` - SyncError string `json:"sync_error"` + Syncing bool `json:"syncing"` + StartTime string `json:"start_time"` + EndTime string `json:"end_time"` + SyncError string `json:"sync_error"` } // Mirror is used for managing mirror repositories. @@ -67,7 +89,7 @@ type SyncStatus struct { //kun:oas basePath=/artifacts/mirror/api/v1 //kun:oas docsPath=/doc/swagger.yaml //kun:oas tags=mirror -type Mirror interface { +type Mirror interface { //nolint:interfacebloat // Create a Mirror repository. //kun:op POST /repository //kun:success statusCode=200 @@ -93,11 +115,21 @@ type Mirror interface { //kun:success statusCode=200 SyncRepository(ctx context.Context, repository string, wait bool) (err error) + // Generate Mirror web pages . + //kun:op GET /repository/generate:web + //kun:success statusCode=200 + GenerateRepository(ctx context.Context, repository string) (err error) + // Get Mirror repository sync status. //kun:op GET /repository/sync:status //kun:success statusCode=200 GetRepositorySyncStatus(ctx context.Context, repository string) (syncStatus *SyncStatus, err error) + // Get Mirror repository sync plan. + //kun:op GET /repository/sync:plan + //kun:success statusCode=200 + GetRepositorySyncPlan(ctx context.Context, repository string) (syncPlan *RepositorySyncPlan, err error) + // List Mirror repository logs. //kun:op GET /repository/logs //kun:success statusCode=200 @@ -112,4 +144,14 @@ type Mirror interface { //kun:op GET /repository/file //kun:success statusCode=200 GetRepositoryFile(ctx context.Context, repository, file string) (repositoryFile *RepositoryFile, err error) + + // Get file count for a Mirror repository. + //kun:op GET /repository/file:count + //kun:success statusCode=200 + GetRepositoryFileCount(ctx context.Context, repository string) (count int, err error) + + // Delete file for a Mirror repository. + //kun:op DELETE /repository/file + //kun:success statusCode=200 + DeleteRepositoryFile(ctx context.Context, repository, file string) (err error) } diff --git a/pkg/plugins/mirror/api/v1/endpoint.go b/pkg/plugins/mirror/api/v1/endpoint.go index 9c487a8..0498785 100644 --- a/pkg/plugins/mirror/api/v1/endpoint.go +++ b/pkg/plugins/mirror/api/v1/endpoint.go @@ -85,6 +85,78 @@ func MakeEndpointOfDeleteRepository(s Mirror) endpoint.Endpoint { } } +type DeleteRepositoryFileRequest struct { + Repository string `json:"repository"` + File string `json:"file"` +} + +// ValidateDeleteRepositoryFileRequest creates a validator for DeleteRepositoryFileRequest. +func ValidateDeleteRepositoryFileRequest(newSchema func(*DeleteRepositoryFileRequest) validating.Schema) httpoption.Validator { + return httpoption.FuncValidator(func(value interface{}) error { + req := value.(*DeleteRepositoryFileRequest) + return httpoption.Validate(newSchema(req)) + }) +} + +type DeleteRepositoryFileResponse struct { + Err error `json:"-"` +} + +func (r *DeleteRepositoryFileResponse) Body() interface{} { return r } + +// Failed implements endpoint.Failer. +func (r *DeleteRepositoryFileResponse) Failed() error { return r.Err } + +// MakeEndpointOfDeleteRepositoryFile creates the endpoint for s.DeleteRepositoryFile. +func MakeEndpointOfDeleteRepositoryFile(s Mirror) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(*DeleteRepositoryFileRequest) + err := s.DeleteRepositoryFile( + ctx, + req.Repository, + req.File, + ) + return &DeleteRepositoryFileResponse{ + Err: err, + }, nil + } +} + +type GenerateRepositoryRequest struct { + Repository string `json:"repository"` +} + +// ValidateGenerateRepositoryRequest creates a validator for GenerateRepositoryRequest. +func ValidateGenerateRepositoryRequest(newSchema func(*GenerateRepositoryRequest) validating.Schema) httpoption.Validator { + return httpoption.FuncValidator(func(value interface{}) error { + req := value.(*GenerateRepositoryRequest) + return httpoption.Validate(newSchema(req)) + }) +} + +type GenerateRepositoryResponse struct { + Err error `json:"-"` +} + +func (r *GenerateRepositoryResponse) Body() interface{} { return r } + +// Failed implements endpoint.Failer. +func (r *GenerateRepositoryResponse) Failed() error { return r.Err } + +// MakeEndpointOfGenerateRepository creates the endpoint for s.GenerateRepository. +func MakeEndpointOfGenerateRepository(s Mirror) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(*GenerateRepositoryRequest) + err := s.GenerateRepository( + ctx, + req.Repository, + ) + return &GenerateRepositoryResponse{ + Err: err, + }, nil + } +} + type GetRepositoryRequest struct { Repository string `json:"repository"` } @@ -161,6 +233,80 @@ func MakeEndpointOfGetRepositoryFile(s Mirror) endpoint.Endpoint { } } +type GetRepositoryFileCountRequest struct { + Repository string `json:"repository"` +} + +// ValidateGetRepositoryFileCountRequest creates a validator for GetRepositoryFileCountRequest. +func ValidateGetRepositoryFileCountRequest(newSchema func(*GetRepositoryFileCountRequest) validating.Schema) httpoption.Validator { + return httpoption.FuncValidator(func(value interface{}) error { + req := value.(*GetRepositoryFileCountRequest) + return httpoption.Validate(newSchema(req)) + }) +} + +type GetRepositoryFileCountResponse struct { + Count int `json:"count"` + Err error `json:"-"` +} + +func (r *GetRepositoryFileCountResponse) Body() interface{} { return r } + +// Failed implements endpoint.Failer. +func (r *GetRepositoryFileCountResponse) Failed() error { return r.Err } + +// MakeEndpointOfGetRepositoryFileCount creates the endpoint for s.GetRepositoryFileCount. +func MakeEndpointOfGetRepositoryFileCount(s Mirror) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(*GetRepositoryFileCountRequest) + count, err := s.GetRepositoryFileCount( + ctx, + req.Repository, + ) + return &GetRepositoryFileCountResponse{ + Count: count, + Err: err, + }, nil + } +} + +type GetRepositorySyncPlanRequest struct { + Repository string `json:"repository"` +} + +// ValidateGetRepositorySyncPlanRequest creates a validator for GetRepositorySyncPlanRequest. +func ValidateGetRepositorySyncPlanRequest(newSchema func(*GetRepositorySyncPlanRequest) validating.Schema) httpoption.Validator { + return httpoption.FuncValidator(func(value interface{}) error { + req := value.(*GetRepositorySyncPlanRequest) + return httpoption.Validate(newSchema(req)) + }) +} + +type GetRepositorySyncPlanResponse struct { + SyncPlan *RepositorySyncPlan `json:"sync_plan"` + Err error `json:"-"` +} + +func (r *GetRepositorySyncPlanResponse) Body() interface{} { return r } + +// Failed implements endpoint.Failer. +func (r *GetRepositorySyncPlanResponse) Failed() error { return r.Err } + +// MakeEndpointOfGetRepositorySyncPlan creates the endpoint for s.GetRepositorySyncPlan. +func MakeEndpointOfGetRepositorySyncPlan(s Mirror) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(*GetRepositorySyncPlanRequest) + syncPlan, err := s.GetRepositorySyncPlan( + ctx, + req.Repository, + ) + return &GetRepositorySyncPlanResponse{ + SyncPlan: syncPlan, + Err: err, + }, nil + } +} + type GetRepositorySyncStatusRequest struct { Repository string `json:"repository"` } diff --git a/pkg/plugins/mirror/api/v1/http.go b/pkg/plugins/mirror/api/v1/http.go index a23d3a1..feefaf9 100644 --- a/pkg/plugins/mirror/api/v1/http.go +++ b/pkg/plugins/mirror/api/v1/http.go @@ -52,6 +52,34 @@ func NewHTTPRouter(svc Mirror, codecs httpcodec.Codecs, opts ...httpoption.Optio ), ) + codec = codecs.EncodeDecoder("DeleteRepositoryFile") + validator = options.RequestValidator("DeleteRepositoryFile") + r.Method( + "DELETE", "/repository/file", + kithttp.NewServer( + MakeEndpointOfDeleteRepositoryFile(svc), + decodeDeleteRepositoryFileRequest(codec, validator), + httpcodec.MakeResponseEncoder(codec, 200), + append(kitOptions, + kithttp.ServerErrorEncoder(httpcodec.MakeErrorEncoder(codec)), + )..., + ), + ) + + codec = codecs.EncodeDecoder("GenerateRepository") + validator = options.RequestValidator("GenerateRepository") + r.Method( + "GET", "/repository/generate:web", + kithttp.NewServer( + MakeEndpointOfGenerateRepository(svc), + decodeGenerateRepositoryRequest(codec, validator), + httpcodec.MakeResponseEncoder(codec, 200), + append(kitOptions, + kithttp.ServerErrorEncoder(httpcodec.MakeErrorEncoder(codec)), + )..., + ), + ) + codec = codecs.EncodeDecoder("GetRepository") validator = options.RequestValidator("GetRepository") r.Method( @@ -80,6 +108,34 @@ func NewHTTPRouter(svc Mirror, codecs httpcodec.Codecs, opts ...httpoption.Optio ), ) + codec = codecs.EncodeDecoder("GetRepositoryFileCount") + validator = options.RequestValidator("GetRepositoryFileCount") + r.Method( + "GET", "/repository/file:count", + kithttp.NewServer( + MakeEndpointOfGetRepositoryFileCount(svc), + decodeGetRepositoryFileCountRequest(codec, validator), + httpcodec.MakeResponseEncoder(codec, 200), + append(kitOptions, + kithttp.ServerErrorEncoder(httpcodec.MakeErrorEncoder(codec)), + )..., + ), + ) + + codec = codecs.EncodeDecoder("GetRepositorySyncPlan") + validator = options.RequestValidator("GetRepositorySyncPlan") + r.Method( + "GET", "/repository/sync:plan", + kithttp.NewServer( + MakeEndpointOfGetRepositorySyncPlan(svc), + decodeGetRepositorySyncPlanRequest(codec, validator), + httpcodec.MakeResponseEncoder(codec, 200), + append(kitOptions, + kithttp.ServerErrorEncoder(httpcodec.MakeErrorEncoder(codec)), + )..., + ), + ) + codec = codecs.EncodeDecoder("GetRepositorySyncStatus") validator = options.RequestValidator("GetRepositorySyncStatus") r.Method( @@ -185,6 +241,38 @@ func decodeDeleteRepositoryRequest(codec httpcodec.Codec, validator httpoption.V } } +func decodeDeleteRepositoryFileRequest(codec httpcodec.Codec, validator httpoption.Validator) kithttp.DecodeRequestFunc { + return func(_ context.Context, r *http.Request) (interface{}, error) { + var _req DeleteRepositoryFileRequest + + if err := codec.DecodeRequestBody(r, &_req); err != nil { + return nil, err + } + + if err := validator.Validate(&_req); err != nil { + return nil, err + } + + return &_req, nil + } +} + +func decodeGenerateRepositoryRequest(codec httpcodec.Codec, validator httpoption.Validator) kithttp.DecodeRequestFunc { + return func(_ context.Context, r *http.Request) (interface{}, error) { + var _req GenerateRepositoryRequest + + if err := codec.DecodeRequestBody(r, &_req); err != nil { + return nil, err + } + + if err := validator.Validate(&_req); err != nil { + return nil, err + } + + return &_req, nil + } +} + func decodeGetRepositoryRequest(codec httpcodec.Codec, validator httpoption.Validator) kithttp.DecodeRequestFunc { return func(_ context.Context, r *http.Request) (interface{}, error) { var _req GetRepositoryRequest @@ -217,6 +305,38 @@ func decodeGetRepositoryFileRequest(codec httpcodec.Codec, validator httpoption. } } +func decodeGetRepositoryFileCountRequest(codec httpcodec.Codec, validator httpoption.Validator) kithttp.DecodeRequestFunc { + return func(_ context.Context, r *http.Request) (interface{}, error) { + var _req GetRepositoryFileCountRequest + + if err := codec.DecodeRequestBody(r, &_req); err != nil { + return nil, err + } + + if err := validator.Validate(&_req); err != nil { + return nil, err + } + + return &_req, nil + } +} + +func decodeGetRepositorySyncPlanRequest(codec httpcodec.Codec, validator httpoption.Validator) kithttp.DecodeRequestFunc { + return func(_ context.Context, r *http.Request) (interface{}, error) { + var _req GetRepositorySyncPlanRequest + + if err := codec.DecodeRequestBody(r, &_req); err != nil { + return nil, err + } + + if err := validator.Validate(&_req); err != nil { + return nil, err + } + + return &_req, nil + } +} + func decodeGetRepositorySyncStatusRequest(codec httpcodec.Codec, validator httpoption.Validator) kithttp.DecodeRequestFunc { return func(_ context.Context, r *http.Request) (interface{}, error) { var _req GetRepositorySyncStatusRequest diff --git a/pkg/plugins/mirror/api/v1/http_client.go b/pkg/plugins/mirror/api/v1/http_client.go index 2671aa2..083213b 100644 --- a/pkg/plugins/mirror/api/v1/http_client.go +++ b/pkg/plugins/mirror/api/v1/http_client.go @@ -132,6 +132,102 @@ func (c *HTTPClient) DeleteRepository(ctx context.Context, repository string, de return nil } +func (c *HTTPClient) DeleteRepositoryFile(ctx context.Context, repository string, file string) (err error) { + codec := c.codecs.EncodeDecoder("DeleteRepositoryFile") + + path := "/repository/file" + u := &url.URL{ + Scheme: c.scheme, + Host: c.host, + Path: c.pathPrefix + path, + } + + reqBody := struct { + Repository string `json:"repository"` + File string `json:"file"` + }{ + Repository: repository, + File: file, + } + reqBodyReader, headers, err := codec.EncodeRequestBody(&reqBody) + if err != nil { + return err + } + + _req, err := http.NewRequestWithContext(ctx, "DELETE", u.String(), reqBodyReader) + if err != nil { + return err + } + + for k, v := range headers { + _req.Header.Set(k, v) + } + + _resp, err := c.httpClient.Do(_req) + if err != nil { + return err + } + defer _resp.Body.Close() + + if _resp.StatusCode < http.StatusOK || _resp.StatusCode > http.StatusNoContent { + var respErr error + err := codec.DecodeFailureResponse(_resp.Body, &respErr) + if err == nil { + err = respErr + } + return err + } + + return nil +} + +func (c *HTTPClient) GenerateRepository(ctx context.Context, repository string) (err error) { + codec := c.codecs.EncodeDecoder("GenerateRepository") + + path := "/repository/generate:web" + u := &url.URL{ + Scheme: c.scheme, + Host: c.host, + Path: c.pathPrefix + path, + } + + reqBody := struct { + Repository string `json:"repository"` + }{ + Repository: repository, + } + reqBodyReader, headers, err := codec.EncodeRequestBody(&reqBody) + if err != nil { + return err + } + + _req, err := http.NewRequestWithContext(ctx, "GET", u.String(), reqBodyReader) + if err != nil { + return err + } + + for k, v := range headers { + _req.Header.Set(k, v) + } + + _resp, err := c.httpClient.Do(_req) + if err != nil { + return err + } + defer _resp.Body.Close() + + if _resp.StatusCode < http.StatusOK || _resp.StatusCode > http.StatusNoContent { + var respErr error + err := codec.DecodeFailureResponse(_resp.Body, &respErr) + if err == nil { + err = respErr + } + return err + } + + return nil +} + func (c *HTTPClient) GetRepository(ctx context.Context, repository string) (properties *RepositoryProperties, err error) { codec := c.codecs.EncodeDecoder("GetRepository") @@ -238,6 +334,110 @@ func (c *HTTPClient) GetRepositoryFile(ctx context.Context, repository string, f return respBody.RepositoryFile, nil } +func (c *HTTPClient) GetRepositoryFileCount(ctx context.Context, repository string) (count int, err error) { + codec := c.codecs.EncodeDecoder("GetRepositoryFileCount") + + path := "/repository/file:count" + u := &url.URL{ + Scheme: c.scheme, + Host: c.host, + Path: c.pathPrefix + path, + } + + reqBody := struct { + Repository string `json:"repository"` + }{ + Repository: repository, + } + reqBodyReader, headers, err := codec.EncodeRequestBody(&reqBody) + if err != nil { + return 0, err + } + + _req, err := http.NewRequestWithContext(ctx, "GET", u.String(), reqBodyReader) + if err != nil { + return 0, err + } + + for k, v := range headers { + _req.Header.Set(k, v) + } + + _resp, err := c.httpClient.Do(_req) + if err != nil { + return 0, err + } + defer _resp.Body.Close() + + if _resp.StatusCode < http.StatusOK || _resp.StatusCode > http.StatusNoContent { + var respErr error + err := codec.DecodeFailureResponse(_resp.Body, &respErr) + if err == nil { + err = respErr + } + return 0, err + } + + respBody := &GetRepositoryFileCountResponse{} + err = codec.DecodeSuccessResponse(_resp.Body, respBody.Body()) + if err != nil { + return 0, err + } + return respBody.Count, nil +} + +func (c *HTTPClient) GetRepositorySyncPlan(ctx context.Context, repository string) (syncPlan *RepositorySyncPlan, err error) { + codec := c.codecs.EncodeDecoder("GetRepositorySyncPlan") + + path := "/repository/sync:plan" + u := &url.URL{ + Scheme: c.scheme, + Host: c.host, + Path: c.pathPrefix + path, + } + + reqBody := struct { + Repository string `json:"repository"` + }{ + Repository: repository, + } + reqBodyReader, headers, err := codec.EncodeRequestBody(&reqBody) + if err != nil { + return nil, err + } + + _req, err := http.NewRequestWithContext(ctx, "GET", u.String(), reqBodyReader) + if err != nil { + return nil, err + } + + for k, v := range headers { + _req.Header.Set(k, v) + } + + _resp, err := c.httpClient.Do(_req) + if err != nil { + return nil, err + } + defer _resp.Body.Close() + + if _resp.StatusCode < http.StatusOK || _resp.StatusCode > http.StatusNoContent { + var respErr error + err := codec.DecodeFailureResponse(_resp.Body, &respErr) + if err == nil { + err = respErr + } + return nil, err + } + + respBody := &GetRepositorySyncPlanResponse{} + err = codec.DecodeSuccessResponse(_resp.Body, respBody.Body()) + if err != nil { + return nil, err + } + return respBody.SyncPlan, nil +} + func (c *HTTPClient) GetRepositorySyncStatus(ctx context.Context, repository string) (syncStatus *SyncStatus, err error) { codec := c.codecs.EncodeDecoder("GetRepositorySyncStatus") diff --git a/pkg/plugins/mirror/api/v1/oas2.go b/pkg/plugins/mirror/api/v1/oas2.go index ad83476..15012b5 100644 --- a/pkg/plugins/mirror/api/v1/oas2.go +++ b/pkg/plugins/mirror/api/v1/oas2.go @@ -75,6 +75,17 @@ paths: $ref: "#/definitions/UpdateRepositoryRequestBody" %s /repository/file: + delete: + description: "Delete file for a Mirror repository." + operationId: "DeleteRepositoryFile" + tags: + - mirror + parameters: + - name: body + in: body + schema: + $ref: "#/definitions/DeleteRepositoryFileRequestBody" + %s get: description: "Get file for a Mirror repository." operationId: "GetRepositoryFile" @@ -86,6 +97,42 @@ paths: schema: $ref: "#/definitions/GetRepositoryFileRequestBody" %s + /repository/generate:web: + get: + description: "Generate Mirror web pages ." + operationId: "GenerateRepository" + tags: + - mirror + parameters: + - name: body + in: body + schema: + $ref: "#/definitions/GenerateRepositoryRequestBody" + %s + /repository/file:count: + get: + description: "Get file count for a Mirror repository." + operationId: "GetRepositoryFileCount" + tags: + - mirror + parameters: + - name: body + in: body + schema: + $ref: "#/definitions/GetRepositoryFileCountRequestBody" + %s + /repository/sync:plan: + get: + description: "Get Mirror repository sync plan." + operationId: "GetRepositorySyncPlan" + tags: + - mirror + parameters: + - name: body + in: body + schema: + $ref: "#/definitions/GetRepositorySyncPlanRequestBody" + %s /repository/sync:status: get: description: "Get Mirror repository sync status." @@ -143,7 +190,11 @@ func getResponses(schema oas2.Schema) []oas2.OASResponses { oas2.GetOASResponses(schema, "DeleteRepository", 200, &DeleteRepositoryResponse{}), oas2.GetOASResponses(schema, "GetRepository", 200, &GetRepositoryResponse{}), oas2.GetOASResponses(schema, "UpdateRepository", 200, &UpdateRepositoryResponse{}), + oas2.GetOASResponses(schema, "DeleteRepositoryFile", 200, &DeleteRepositoryFileResponse{}), oas2.GetOASResponses(schema, "GetRepositoryFile", 200, &GetRepositoryFileResponse{}), + oas2.GetOASResponses(schema, "GenerateRepository", 200, &GenerateRepositoryResponse{}), + oas2.GetOASResponses(schema, "GetRepositoryFileCount", 200, &GetRepositoryFileCountResponse{}), + oas2.GetOASResponses(schema, "GetRepositorySyncPlan", 200, &GetRepositorySyncPlanResponse{}), oas2.GetOASResponses(schema, "GetRepositorySyncStatus", 200, &GetRepositorySyncStatusResponse{}), oas2.GetOASResponses(schema, "ListRepositoryFiles", 200, &ListRepositoryFilesResponse{}), oas2.GetOASResponses(schema, "ListRepositoryLogs", 200, &ListRepositoryLogsResponse{}), @@ -166,6 +217,17 @@ func getDefinitions(schema oas2.Schema) map[string]oas2.Definition { }{})) oas2.AddResponseDefinitions(defs, schema, "DeleteRepository", 200, (&DeleteRepositoryResponse{}).Body()) + oas2.AddDefinition(defs, "DeleteRepositoryFileRequestBody", reflect.ValueOf(&struct { + Repository string `json:"repository"` + File string `json:"file"` + }{})) + oas2.AddResponseDefinitions(defs, schema, "DeleteRepositoryFile", 200, (&DeleteRepositoryFileResponse{}).Body()) + + oas2.AddDefinition(defs, "GenerateRepositoryRequestBody", reflect.ValueOf(&struct { + Repository string `json:"repository"` + }{})) + oas2.AddResponseDefinitions(defs, schema, "GenerateRepository", 200, (&GenerateRepositoryResponse{}).Body()) + oas2.AddDefinition(defs, "GetRepositoryRequestBody", reflect.ValueOf(&struct { Repository string `json:"repository"` }{})) @@ -177,6 +239,16 @@ func getDefinitions(schema oas2.Schema) map[string]oas2.Definition { }{})) oas2.AddResponseDefinitions(defs, schema, "GetRepositoryFile", 200, (&GetRepositoryFileResponse{}).Body()) + oas2.AddDefinition(defs, "GetRepositoryFileCountRequestBody", reflect.ValueOf(&struct { + Repository string `json:"repository"` + }{})) + oas2.AddResponseDefinitions(defs, schema, "GetRepositoryFileCount", 200, (&GetRepositoryFileCountResponse{}).Body()) + + oas2.AddDefinition(defs, "GetRepositorySyncPlanRequestBody", reflect.ValueOf(&struct { + Repository string `json:"repository"` + }{})) + oas2.AddResponseDefinitions(defs, schema, "GetRepositorySyncPlan", 200, (&GetRepositorySyncPlanResponse{}).Body()) + oas2.AddDefinition(defs, "GetRepositorySyncStatusRequestBody", reflect.ValueOf(&struct { Repository string `json:"repository"` }{}))