diff --git a/internal/plugins/mirror/api.go b/internal/plugins/mirror/api.go index 9ec3c8e..aaf9d4b 100644 --- a/internal/plugins/mirror/api.go +++ b/internal/plugins/mirror/api.go @@ -53,6 +53,13 @@ func (p *Plugin) SyncRepository(ctx context.Context, repository string, wait boo return p.repositoryManager.Get(ctx, repository).SyncRepository(ctx, wait) } +func (p *Plugin) SyncRepositoryWithConfig(ctx context.Context, repository string, mirrorConfigs []apiv1.MirrorConfig, webConfig *apiv1.WebConfig, wait bool) (err error) { + if err := checkRepository(repository); err != nil { + return err + } + return p.repositoryManager.Get(ctx, repository).SyncRepositoryWithConfig(ctx, mirrorConfigs, webConfig, wait) +} + func (p *Plugin) GenerateRepository(ctx context.Context, repository string) (err error) { if err := checkRepository(repository); err != nil { return err diff --git a/internal/plugins/mirror/pkg/mirrorrepository/api.go b/internal/plugins/mirror/pkg/mirrorrepository/api.go index c2d61ad..459e4cf 100644 --- a/internal/plugins/mirror/pkg/mirrorrepository/api.go +++ b/internal/plugins/mirror/pkg/mirrorrepository/api.go @@ -282,6 +282,53 @@ func (h *Handler) SyncRepository(_ context.Context, wait bool) (err error) { return nil } +func (h *Handler) SyncRepositoryWithConfig(_ context.Context, mirrorConfigs []apiv1.MirrorConfig, webConfig *apiv1.WebConfig, wait bool) (err error) { + if !h.Started() { + return werror.Wrap(gcode.ErrUnavailable, err) + } else if !h.getMirror() { + return werror.Wrap(gcode.ErrFailedPrecondition, errors.New("repository not setup as a mirror")) + } + + // Set mirror configs if supplied. + if mirrorConfigs != nil { + if err := h.setMirrorConfigs(mirrorConfigs); err != nil { + return werror.Wrap(gcode.ErrInternal, err) + } + } + + // Set web config if supplied. + if webConfig != nil { + if err := h.setWebConfig(webConfig); err != nil { + return werror.Wrap(gcode.ErrInternal, err) + } + } + + if h.delete.Load() { + return werror.Wrap(gcode.ErrAlreadyExists, fmt.Errorf("repository %s is being deleted", h.Repository)) + } else if h.syncing.Swap(true) { + return werror.Wrap(gcode.ErrAlreadyExists, errors.New("a repository sync is already running")) + } + + var waitErrCh chan error + + if wait { + waitErrCh = make(chan error, 1) + } + + select { + case h.syncCh <- waitErrCh: + if waitErrCh != nil { + if err := <-waitErrCh; err != nil { + return werror.Wrap(gcode.ErrInternal, fmt.Errorf("synchronization failed: %w", err)) + } + } + default: + return werror.Wrap(gcode.ErrUnavailable, errors.New("something goes wrong")) + } + + return nil +} + func (h *Handler) GenerateRepository(_ context.Context) (err error) { if !h.Started() { return werror.Wrap(gcode.ErrUnavailable, err) @@ -535,3 +582,17 @@ func toRepositoryFileAPI(file *mirrordb.RepositoryFile) *apiv1.RepositoryFile { ConfigID: file.ConfigID, } } + +func toRepositoryFileDB(file *apiv1.RepositoryFile) *mirrordb.RepositoryFile { + return &mirrordb.RepositoryFile{ + Tag: file.Tag, + Name: file.Name, + Reference: file.Reference, + Parent: file.Parent, + Link: file.Link, + ModifiedTime: utils.StringToTime(file.ModifiedTime), + Mode: file.Mode, + Size: file.Size, + ConfigID: file.ConfigID, + } +} diff --git a/internal/plugins/mirror/pkg/mirrorrepository/mirrorsync.go b/internal/plugins/mirror/pkg/mirrorrepository/mirrorsync.go new file mode 100644 index 0000000..43d9636 --- /dev/null +++ b/internal/plugins/mirror/pkg/mirrorrepository/mirrorsync.go @@ -0,0 +1,382 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024, CIQ, Inc. All rights reserved +// SPDX-License-Identifier: Apache-2.0 + +package mirrorrepository + +import ( + "bytes" + "context" + "crypto/md5" //nolint:gosec + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/cenkalti/backoff" + "go.ciq.dev/beskar/internal/plugins/mirror/pkg/mirrordb" + "go.ciq.dev/beskar/pkg/oras" + "go.ciq.dev/beskar/pkg/orasmirror" + apiv1 "go.ciq.dev/beskar/pkg/plugins/mirror/api/v1" + "go.ciq.dev/go-rsync/rsync" +) + +type MirrorSyncerPlan struct { + AddRemoteFiles []*mirrordb.RepositoryFile + DeleteLocalFiles []*mirrordb.RepositoryFile +} + +type MirrorSyncer struct { + h *Handler + config mirrorConfig + configID uint64 + parallelism int + upstreamRepository string +} + +func NewMirrorSyncer(h *Handler, config mirrorConfig, configID uint64, parallelism int) (*MirrorSyncer, error) { + repository := path.Base(config.URL.Path) + + return &MirrorSyncer{ + h: h, + config: config, + configID: configID, + parallelism: parallelism, + upstreamRepository: path.Join("artifacts/mirror", repository), + }, nil +} + +func (s *MirrorSyncer) Plan() (*MirrorSyncerPlan, error) { + // Fetch remote files + remoteAPIFiles, err := s.ListRepositoryFiles() + if err != nil { + s.h.logger.Error("Failed to list remote files", "error", err) + return nil, err + } + + // Convert to db file structure + remoteFiles := make([]*mirrordb.RepositoryFile, 0, len(remoteAPIFiles)) + for _, f := range remoteAPIFiles { + remoteFiles = append(remoteFiles, toRepositoryFileDB(f)) + } + + // Fetch local files + localFiles, err := s.h.listRepositoryFilesByConfigID(context.Background(), s.configID) + if err != nil { + s.h.logger.Error("Failed to list local files", "error", err) + return nil, err + } + + add, del := diff(localFiles, remoteFiles) + + return &MirrorSyncerPlan{ + AddRemoteFiles: add, + DeleteLocalFiles: del, + }, nil +} + +func diff(local, remote []*mirrordb.RepositoryFile) (add, del []*mirrordb.RepositoryFile) { + mLocal := make(map[string]*mirrordb.RepositoryFile, len(local)) + for _, l := range local { + mLocal[l.Name] = l + } + + // Find items in remote that are not in local + for _, r := range remote { + if _, found := mLocal[r.Name]; !found { + add = append(add, r) + } + } + + mRemote := make(map[string]*mirrordb.RepositoryFile, len(remote)) + for _, r := range remote { + mRemote[r.Name] = r + } + + // Find items in local that are not in remote + for _, l := range local { + if _, found := mRemote[l.Name]; !found { + del = append(del, l) + } + } + + return add, del +} + +func (s *MirrorSyncer) filePush(remoteFile *mirrordb.RepositoryFile) error { + fileReference := filepath.Clean(s.h.generateFileReference(strings.ToLower(remoteFile.Name))) + + // Generate GET URL + u := &url.URL{ + Scheme: s.config.URL.Scheme, + Host: s.config.URL.Host, + User: s.config.URL.User, + Path: path.Join(s.config.URL.Path, remoteFile.Name), + } + + // Fetch file from remote + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, u.String(), nil) + if err != nil { + s.h.logger.Error("Failed to create request", "file", remoteFile.Name, "error", err) + return err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + s.h.logger.Error("Failed to fetch file", "file", remoteFile.Name, "error", err) + return err + } + + f, err := os.CreateTemp(s.h.downloadDir(), "") + if err != nil { + s.h.logger.Error("Failed to create temp file", "file", remoteFile.Name, "error", err) + resp.Body.Close() + return err + } + defer os.Remove(f.Name()) + + s.h.logger.Debug("Downloading", "file", remoteFile.Name, "temp", f.Name()) + _, err = io.Copy(f, resp.Body) + if err != nil { + s.h.logger.Error("Failed to download file", "file", remoteFile.Name, "error", err) + resp.Body.Close() + return err + } + resp.Body.Close() + + // Commit content to storage + if err := f.Sync(); err != nil { + return err + } + + cb := backoff.WithMaxRetries( + backoff.NewConstantBackOff(5*time.Second), + 3, + ) + err = backoff.Retry(func() error { + // Seek to start of file + if _, err := f.Seek(0, 0); err != nil { + return err + } + + // Push file to storage + repoPath := filepath.Join(s.h.Repository, filepath.Dir(remoteFile.Name)) + s.h.logger.Debug("Pushing", "file", remoteFile.Name, "repo", repoPath) + pusher, err := orasmirror.NewStaticFileStreamPusher(f, strings.ToLower(filepath.Base(remoteFile.Name)), strings.ToLower(repoPath), s.h.Params.NameOptions...) + if err != nil { + s.h.logger.Error("Failed to create pusher", "file", remoteFile.Name, "error", err) + return err + } + + err = oras.Push(pusher, s.h.Params.RemoteOptions...) + if err != nil { + s.h.logger.Error("Failed to push file", "file", remoteFile.Name, "error", err) + return err + } + + return nil + }, cb) + if err != nil { + return err + } + + // Add entry to DB + //nolint:gosec + sum := md5.Sum([]byte(fileReference)) + tag := hex.EncodeToString(sum[:]) + + err = s.h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + Tag: tag, + Name: remoteFile.Name, + Reference: fileReference, + Parent: filepath.Dir(remoteFile.Name), + Link: "", + ModifiedTime: remoteFile.ModifiedTime, + Mode: remoteFile.Mode, + Size: remoteFile.Size, + ConfigID: s.configID, + }) + if err != nil { + s.h.logger.Error("Failed to add file to repository database", "file", remoteFile.Name, "error", err) + return err + } + + return nil +} + +func (s *MirrorSyncer) fileWorker(c chan *mirrordb.RepositoryFile, wg *sync.WaitGroup) { + wg.Add(1) + defer wg.Done() + + for remoteFile := range c { + if err := s.filePush(remoteFile); err != nil { + s.h.logger.Error("Failed to push file", "file", remoteFile.Name, "error", err) + } + } +} + +func (s *MirrorSyncer) Sync() error { + // Generate plan + plan, err := s.Plan() + if err != nil { + s.h.logger.Error("Failed to generate sync plan", "error", err) + return err + } + + // Create push channel and wait group + pushChan := make(chan *mirrordb.RepositoryFile) + wg := new(sync.WaitGroup) + + // Ensure download directory exists + if err := os.MkdirAll(s.h.downloadDir(), 0o755); err != nil { + return err + } + + // Start worker pool + for i := 0; i < s.parallelism; i++ { + go s.fileWorker(pushChan, wg) + } + + // Fetch/Update remote files + for _, remoteFile := range plan.AddRemoteFiles { + s.h.logger.Debug("Processing", "file", remoteFile.Name) + + fileReference := filepath.Clean(s.h.generateFileReference(strings.ToLower(remoteFile.Name))) + + if rsync.FileMode(remoteFile.Mode).IsREG() { + // Process file in worker pool + pushChan <- remoteFile + } else if rsync.FileMode(remoteFile.Mode).IsDIR() { + // Add entry to DB + //nolint:gosec + sum := md5.Sum([]byte(fileReference)) + tag := hex.EncodeToString(sum[:]) + + err := s.h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + Tag: tag, + Name: remoteFile.Name, + Reference: fileReference, + Parent: filepath.Dir(remoteFile.Name), + Link: "", + ModifiedTime: remoteFile.ModifiedTime, + Mode: remoteFile.Mode, + Size: remoteFile.Size, + ConfigID: s.configID, + }) + if err != nil { + return err + } + } else if rsync.FileMode(remoteFile.Mode).IsLNK() { + s.h.logger.Debug("Processing Link", "content", remoteFile.Link) + + // Split first 3 path elements from link and take the rest as the reference. + split := strings.SplitN(remoteFile.Link, "/", 4) + if len(split) < 4 { + s.h.logger.Error("Failed to split link", "link", remoteFile.Link) + return fmt.Errorf("failed to split link: %q", remoteFile.Link) + } + + link := s.h.generateFileReference(strings.ToLower(split[3])) + + // Add entry to DB + //nolint:gosec + sum := md5.Sum([]byte(remoteFile.Name)) + tag := hex.EncodeToString(sum[:]) + + err := s.h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + Tag: tag, + Name: remoteFile.Name, + Reference: remoteFile.Name, + Parent: filepath.Dir(remoteFile.Name), + Link: link, + ModifiedTime: remoteFile.ModifiedTime, + Mode: remoteFile.Mode, + Size: remoteFile.Size, + ConfigID: s.configID, + }) + if err != nil { + return err + } + } + } + + close(pushChan) + + // Wait for all files to be processed + wg.Wait() + + // Remove local files + for _, localFile := range plan.DeleteLocalFiles { + s.h.logger.Debug("Removing", "file", localFile.Name) + + fileReference := filepath.Clean(s.h.generateFileReference(strings.ToLower(localFile.Name))) + + // Remove entry from DB + err := s.h.removeFileFromRepositoryDatabase(context.Background(), fileReference) + if err != nil { + s.h.logger.Error("Failed to remove file from repository database", "file", localFile.Name, "error", err) + return err + } + } + + return nil +} + +// Custom list method since generated client doesn't supply user credentials in the URL. +func (s *MirrorSyncer) ListRepositoryFiles() ([]*apiv1.RepositoryFile, error) { + var u *url.URL + + if s.config.HTTPURL != nil { + u = s.config.HTTPURL + } else { + path := "/repository/file:list" + u = &url.URL{ + Scheme: s.config.URL.Scheme, + Host: s.config.URL.Host, + User: s.config.URL.User, + Path: apiv1.URLPath + path, + } + } + + reqBody := struct { + Repository string `json:"repository"` + }{ + Repository: s.upstreamRepository, + } + + reqBodyBytes, err := json.Marshal(reqBody) + if err != nil { + return nil, err + } + + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, u.String(), bytes.NewBuffer(reqBodyBytes)) + if err != nil { + return nil, err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + respBody := struct { + RepositoryFiles []*apiv1.RepositoryFile `json:"repository_files"` + }{ + RepositoryFiles: make([]*apiv1.RepositoryFile, 0), + } + + err = json.NewDecoder(resp.Body).Decode(&respBody) + if err != nil { + return nil, err + } + + return respBody.RepositoryFiles, nil +} diff --git a/internal/plugins/mirror/pkg/mirrorrepository/sync.go b/internal/plugins/mirror/pkg/mirrorrepository/sync.go index 277891f..6d1aa3a 100644 --- a/internal/plugins/mirror/pkg/mirrorrepository/sync.go +++ b/internal/plugins/mirror/pkg/mirrorrepository/sync.go @@ -5,6 +5,7 @@ package mirrorrepository import ( "context" + "fmt" "io" "os" @@ -47,51 +48,18 @@ func (h *Handler) repositorySync(_ context.Context) (errFn error) { } for i, config := range h.mirrorConfigs { - addr, module, path, err := rsync.SplitURL(config.URL) - if err != nil { - return err - } - - cOpts := []rsync.ClientOption{rsync.WithLogger(h.logger)} - if len(config.Exclusions) > 0 { - cOpts = append(cOpts, rsync.WithExclusionList(config.Exclusions)) - } - - if config.URL.User != nil { - password, _ := config.URL.User.Password() - cOpts = append(cOpts, rsync.WithClientAuth(config.URL.User.Username(), password)) - } - - s := NewStorage(h, config, uint64(i)) - - ppath := rsync.TrimPrepath(path) - client, err := rsync.SocketClient(s, addr, module, ppath, cOpts...) - if err != nil { - s.Close() - return err - } - - if config.HTTPURL != nil { - sp, err := client.GetSyncPlan() - if err != nil { - s.Close() + switch config.URL.Scheme { + case "rsync": + if err := h.rsync(config, i); err != nil { return err } - - ps := NewPlanSyncer(h, config, uint64(i), h.Params.Sync.MaxWorkerCount, sp) - - if err := ps.Sync(); err != nil { - s.Close() - return err - } - } else { - if err := client.Sync(); err != nil { - s.Close() + case "http", "https": + if err := h.mirrorSync(config, i); err != nil { return err } + default: + return fmt.Errorf("unsupported scheme: %s", config.URL.Scheme) } - - s.Close() } h.logger.Debug("generating index.html files") @@ -122,52 +90,141 @@ func copyTo(src io.Reader, dest string) error { return nil } -func (h *Handler) getSyncPlan() (*apiv1.RepositorySyncPlan, error) { - plan := &apiv1.RepositorySyncPlan{ - Add: []string{}, - Remove: []string{}, - } - +func (h *Handler) getSyncPlan() (plan *apiv1.RepositorySyncPlan, err error) { for i, config := range h.mirrorConfigs { - addr, module, path, err := rsync.SplitURL(config.URL) - if err != nil { - return nil, err + switch config.URL.Scheme { + case "rsync": + plan, err = h.rsyncPlan(config, i) + if err != nil { + return nil, err + } + case "http", "https": + plan, err = h.mirrorSyncPlan(config, i) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("unsupported scheme: %s", config.URL.Scheme) } + } - cOpts := []rsync.ClientOption{rsync.WithLogger(h.logger)} - if len(config.Exclusions) > 0 { - cOpts = append(cOpts, rsync.WithExclusionList(config.Exclusions)) - } + return plan, nil +} - if config.URL.User != nil { - password, _ := config.URL.User.Password() - cOpts = append(cOpts, rsync.WithClientAuth(config.URL.User.Username(), password)) - } +func (h *Handler) rsync(config mirrorConfig, configIndex int) error { + addr, module, path, err := rsync.SplitURL(config.URL) + if err != nil { + return err + } - s := NewStorage(h, config, uint64(i)) - defer s.Close() + cOpts := []rsync.ClientOption{rsync.WithLogger(h.logger)} + if len(config.Exclusions) > 0 { + cOpts = append(cOpts, rsync.WithExclusionList(config.Exclusions)) + } - ppath := rsync.TrimPrepath(path) - client, err := rsync.SocketClient(s, addr, module, ppath, cOpts...) - if err != nil { - s.Close() - return nil, err - } + if config.URL.User != nil { + password, _ := config.URL.User.Password() + cOpts = append(cOpts, rsync.WithClientAuth(config.URL.User.Username(), password)) + } + + s := NewStorage(h, config, uint64(configIndex)) + defer s.Close() + + ppath := rsync.TrimPrepath(path) + client, err := rsync.SocketClient(s, addr, module, ppath, cOpts...) + if err != nil { + return err + } + if config.HTTPURL != nil { sp, err := client.GetSyncPlan() if err != nil { - s.Close() - return nil, err + return err } - for _, f := range sp.AddRemoteFiles { - plan.Add = append(plan.Add, string(sp.RemoteFiles[f].Path)) + ps := NewPlanSyncer(h, config, uint64(configIndex), h.Params.Sync.MaxWorkerCount, sp) + if err := ps.Sync(); err != nil { + return err } - - for _, f := range sp.DeleteLocalFiles { - plan.Remove = append(plan.Remove, string(sp.LocalFiles[f].Path)) + } else { + if err := client.Sync(); err != nil { + return err } } - return plan, nil + return nil +} + +func (h *Handler) rsyncPlan(config mirrorConfig, configIndex int) (*apiv1.RepositorySyncPlan, error) { + addr, module, path, err := rsync.SplitURL(config.URL) + if err != nil { + return nil, err + } + + cOpts := []rsync.ClientOption{rsync.WithLogger(h.logger)} + if len(config.Exclusions) > 0 { + cOpts = append(cOpts, rsync.WithExclusionList(config.Exclusions)) + } + + if config.URL.User != nil { + password, _ := config.URL.User.Password() + cOpts = append(cOpts, rsync.WithClientAuth(config.URL.User.Username(), password)) + } + + s := NewStorage(h, config, uint64(configIndex)) + defer s.Close() + + ppath := rsync.TrimPrepath(path) + client, err := rsync.SocketClient(s, addr, module, ppath, cOpts...) + if err != nil { + return nil, err + } + + sp, err := client.GetSyncPlan() + if err != nil { + return nil, err + } + + var plan apiv1.RepositorySyncPlan + for _, f := range sp.AddRemoteFiles { + plan.Add = append(plan.Add, string(sp.RemoteFiles[f].Path)) + } + + for _, f := range sp.DeleteLocalFiles { + plan.Remove = append(plan.Remove, string(sp.LocalFiles[f].Path)) + } + + return &plan, nil +} + +func (h *Handler) mirrorSync(config mirrorConfig, configIndex int) error { + syncer, err := NewMirrorSyncer(h, config, uint64(configIndex), h.Params.Sync.MaxWorkerCount) + if err != nil { + return err + } + + return syncer.Sync() +} + +func (h *Handler) mirrorSyncPlan(config mirrorConfig, configIndex int) (*apiv1.RepositorySyncPlan, error) { + syncer, err := NewMirrorSyncer(h, config, uint64(configIndex), h.Params.Sync.MaxWorkerCount) + if err != nil { + return nil, err + } + + p, err := syncer.Plan() + if err != nil { + return nil, err + } + + var plan apiv1.RepositorySyncPlan + for _, f := range p.AddRemoteFiles { + plan.Add = append(plan.Add, f.Name) + } + + for _, f := range p.DeleteLocalFiles { + plan.Remove = append(plan.Remove, f.Name) + } + + return &plan, nil } diff --git a/pkg/plugins/mirror/api/v1/api.go b/pkg/plugins/mirror/api/v1/api.go index 743c1a2..54811a3 100644 --- a/pkg/plugins/mirror/api/v1/api.go +++ b/pkg/plugins/mirror/api/v1/api.go @@ -115,6 +115,11 @@ type Mirror interface { //nolint:interfacebloat //kun:success statusCode=200 SyncRepository(ctx context.Context, repository string, wait bool) (err error) + // Sync Mirror repository with an upstream repository using a specified config. + //kun:op GET /repository/sync:config + //kun:success statusCode=200 + SyncRepositoryWithConfig(ctx context.Context, repository string, mirrorConfigs []MirrorConfig, webConfig *WebConfig, wait bool) (err error) + // Generate Mirror web pages . //kun:op GET /repository/generate:web //kun:success statusCode=200 diff --git a/pkg/plugins/mirror/api/v1/endpoint.go b/pkg/plugins/mirror/api/v1/endpoint.go index 0498785..acf2301 100644 --- a/pkg/plugins/mirror/api/v1/endpoint.go +++ b/pkg/plugins/mirror/api/v1/endpoint.go @@ -459,6 +459,47 @@ func MakeEndpointOfSyncRepository(s Mirror) endpoint.Endpoint { } } +type SyncRepositoryWithConfigRequest struct { + Repository string `json:"repository"` + MirrorConfigs []MirrorConfig `json:"mirror_configs"` + WebConfig *WebConfig `json:"web_config"` + Wait bool `json:"wait"` +} + +// ValidateSyncRepositoryWithConfigRequest creates a validator for SyncRepositoryWithConfigRequest. +func ValidateSyncRepositoryWithConfigRequest(newSchema func(*SyncRepositoryWithConfigRequest) validating.Schema) httpoption.Validator { + return httpoption.FuncValidator(func(value interface{}) error { + req := value.(*SyncRepositoryWithConfigRequest) + return httpoption.Validate(newSchema(req)) + }) +} + +type SyncRepositoryWithConfigResponse struct { + Err error `json:"-"` +} + +func (r *SyncRepositoryWithConfigResponse) Body() interface{} { return r } + +// Failed implements endpoint.Failer. +func (r *SyncRepositoryWithConfigResponse) Failed() error { return r.Err } + +// MakeEndpointOfSyncRepositoryWithConfig creates the endpoint for s.SyncRepositoryWithConfig. +func MakeEndpointOfSyncRepositoryWithConfig(s Mirror) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(*SyncRepositoryWithConfigRequest) + err := s.SyncRepositoryWithConfig( + ctx, + req.Repository, + req.MirrorConfigs, + req.WebConfig, + req.Wait, + ) + return &SyncRepositoryWithConfigResponse{ + Err: err, + }, nil + } +} + type UpdateRepositoryRequest struct { Repository string `json:"repository"` Properties *RepositoryProperties `json:"properties"` diff --git a/pkg/plugins/mirror/api/v1/http.go b/pkg/plugins/mirror/api/v1/http.go index feefaf9..9c1058a 100644 --- a/pkg/plugins/mirror/api/v1/http.go +++ b/pkg/plugins/mirror/api/v1/http.go @@ -192,6 +192,20 @@ func NewHTTPRouter(svc Mirror, codecs httpcodec.Codecs, opts ...httpoption.Optio ), ) + codec = codecs.EncodeDecoder("SyncRepositoryWithConfig") + validator = options.RequestValidator("SyncRepositoryWithConfig") + r.Method( + "GET", "/repository/sync:config", + kithttp.NewServer( + MakeEndpointOfSyncRepositoryWithConfig(svc), + decodeSyncRepositoryWithConfigRequest(codec, validator), + httpcodec.MakeResponseEncoder(codec, 200), + append(kitOptions, + kithttp.ServerErrorEncoder(httpcodec.MakeErrorEncoder(codec)), + )..., + ), + ) + codec = codecs.EncodeDecoder("UpdateRepository") validator = options.RequestValidator("UpdateRepository") r.Method( @@ -401,6 +415,22 @@ func decodeSyncRepositoryRequest(codec httpcodec.Codec, validator httpoption.Val } } +func decodeSyncRepositoryWithConfigRequest(codec httpcodec.Codec, validator httpoption.Validator) kithttp.DecodeRequestFunc { + return func(_ context.Context, r *http.Request) (interface{}, error) { + var _req SyncRepositoryWithConfigRequest + + if err := codec.DecodeRequestBody(r, &_req); err != nil { + return nil, err + } + + if err := validator.Validate(&_req); err != nil { + return nil, err + } + + return &_req, nil + } +} + func decodeUpdateRepositoryRequest(codec httpcodec.Codec, validator httpoption.Validator) kithttp.DecodeRequestFunc { return func(_ context.Context, r *http.Request) (interface{}, error) { var _req UpdateRepositoryRequest diff --git a/pkg/plugins/mirror/api/v1/http_client.go b/pkg/plugins/mirror/api/v1/http_client.go index 083213b..2ea32b2 100644 --- a/pkg/plugins/mirror/api/v1/http_client.go +++ b/pkg/plugins/mirror/api/v1/http_client.go @@ -647,6 +647,59 @@ func (c *HTTPClient) SyncRepository(ctx context.Context, repository string, wait return nil } +func (c *HTTPClient) SyncRepositoryWithConfig(ctx context.Context, repository string, mirrorConfigs []MirrorConfig, webConfig *WebConfig, wait bool) (err error) { + codec := c.codecs.EncodeDecoder("SyncRepositoryWithConfig") + + path := "/repository/sync:config" + u := &url.URL{ + Scheme: c.scheme, + Host: c.host, + Path: c.pathPrefix + path, + } + + reqBody := struct { + Repository string `json:"repository"` + MirrorConfigs []MirrorConfig `json:"mirror_configs"` + WebConfig *WebConfig `json:"web_config"` + Wait bool `json:"wait"` + }{ + Repository: repository, + MirrorConfigs: mirrorConfigs, + WebConfig: webConfig, + Wait: wait, + } + reqBodyReader, headers, err := codec.EncodeRequestBody(&reqBody) + if err != nil { + return err + } + + _req, err := http.NewRequestWithContext(ctx, "GET", u.String(), reqBodyReader) + if err != nil { + return err + } + + for k, v := range headers { + _req.Header.Set(k, v) + } + + _resp, err := c.httpClient.Do(_req) + if err != nil { + return err + } + defer _resp.Body.Close() + + if _resp.StatusCode < http.StatusOK || _resp.StatusCode > http.StatusNoContent { + var respErr error + err := codec.DecodeFailureResponse(_resp.Body, &respErr) + if err == nil { + err = respErr + } + return err + } + + return nil +} + func (c *HTTPClient) UpdateRepository(ctx context.Context, repository string, properties *RepositoryProperties) (err error) { codec := c.codecs.EncodeDecoder("UpdateRepository") diff --git a/pkg/plugins/mirror/api/v1/oas2.go b/pkg/plugins/mirror/api/v1/oas2.go index 15012b5..de9516c 100644 --- a/pkg/plugins/mirror/api/v1/oas2.go +++ b/pkg/plugins/mirror/api/v1/oas2.go @@ -181,6 +181,18 @@ paths: schema: $ref: "#/definitions/SyncRepositoryRequestBody" %s + /repository/sync:config: + get: + description: "Sync Mirror repository with an upstream repository using a specified config." + operationId: "SyncRepositoryWithConfig" + tags: + - mirror + parameters: + - name: body + in: body + schema: + $ref: "#/definitions/SyncRepositoryWithConfigRequestBody" + %s ` ) @@ -199,6 +211,7 @@ func getResponses(schema oas2.Schema) []oas2.OASResponses { oas2.GetOASResponses(schema, "ListRepositoryFiles", 200, &ListRepositoryFilesResponse{}), oas2.GetOASResponses(schema, "ListRepositoryLogs", 200, &ListRepositoryLogsResponse{}), oas2.GetOASResponses(schema, "SyncRepository", 200, &SyncRepositoryResponse{}), + oas2.GetOASResponses(schema, "SyncRepositoryWithConfig", 200, &SyncRepositoryWithConfigResponse{}), } } @@ -272,6 +285,14 @@ func getDefinitions(schema oas2.Schema) map[string]oas2.Definition { }{})) oas2.AddResponseDefinitions(defs, schema, "SyncRepository", 200, (&SyncRepositoryResponse{}).Body()) + oas2.AddDefinition(defs, "SyncRepositoryWithConfigRequestBody", reflect.ValueOf(&struct { + Repository string `json:"repository"` + MirrorConfigs []MirrorConfig `json:"mirror_configs"` + WebConfig *WebConfig `json:"web_config"` + Wait bool `json:"wait"` + }{})) + oas2.AddResponseDefinitions(defs, schema, "SyncRepositoryWithConfig", 200, (&SyncRepositoryWithConfigResponse{}).Body()) + oas2.AddDefinition(defs, "UpdateRepositoryRequestBody", reflect.ValueOf(&struct { Repository string `json:"repository"` Properties *RepositoryProperties `json:"properties"` diff --git a/pkg/utils/time.go b/pkg/utils/time.go index 16981bd..c83e3bb 100644 --- a/pkg/utils/time.go +++ b/pkg/utils/time.go @@ -13,3 +13,11 @@ func TimeToString(t int64) string { } return time.Unix(t, 0).Format(timeFormat) } + +func StringToTime(s string) int64 { + if s == "" { + return 0 + } + t, _ := time.Parse(timeFormat, s) + return t.Unix() +}