Skip to content

Commit

Permalink
Merge branch 'dev' into analyzer-manager-version-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
eyalbe4 authored Nov 24, 2023
2 parents d4325f9 + 0e03b4b commit 43e840d
Show file tree
Hide file tree
Showing 28 changed files with 147,811 additions and 10,939 deletions.
40 changes: 40 additions & 0 deletions artifactory/commands/transferfiles/filediff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,43 @@ func TestConvertResultsToFileRepresentation(t *testing.T) {
assert.Equal(t, []api.FileRepresentation{testCase.expectedOutput}, files)
}
}

var generateDiffAqlQueryTestCases = []struct {
paginationOffset int
disabledDistinctiveAql bool
expectedAql string
}{
{0, false, "items.find({\"$and\":[{\"modified\":{\"$gte\":\"1\"}},{\"modified\":{\"$lt\":\"2\"}},{\"repo\":\"repo1\",\"type\":\"any\"}]}).include(\"repo\",\"path\",\"name\",\"type\",\"modified\",\"size\").sort({\"$asc\":[\"name\",\"path\"]}).offset(0).limit(10000)"},
{0, true, "items.find({\"$and\":[{\"modified\":{\"$gte\":\"1\"}},{\"modified\":{\"$lt\":\"2\"}},{\"repo\":\"repo1\",\"type\":\"any\"}]}).include(\"repo\",\"path\",\"name\",\"type\",\"modified\",\"size\").sort({\"$asc\":[\"name\",\"path\"]}).offset(0).limit(10000).distinct(false)"},
{2, false, "items.find({\"$and\":[{\"modified\":{\"$gte\":\"1\"}},{\"modified\":{\"$lt\":\"2\"}},{\"repo\":\"repo1\",\"type\":\"any\"}]}).include(\"repo\",\"path\",\"name\",\"type\",\"modified\",\"size\").sort({\"$asc\":[\"name\",\"path\"]}).offset(20000).limit(10000)"},
{2, true, "items.find({\"$and\":[{\"modified\":{\"$gte\":\"1\"}},{\"modified\":{\"$lt\":\"2\"}},{\"repo\":\"repo1\",\"type\":\"any\"}]}).include(\"repo\",\"path\",\"name\",\"type\",\"modified\",\"size\").sort({\"$asc\":[\"name\",\"path\"]}).offset(20000).limit(10000).distinct(false)"},
}

func TestGenerateDiffAqlQuery(t *testing.T) {
for _, testCase := range generateDiffAqlQueryTestCases {
t.Run("", func(*testing.T) {
results := generateDiffAqlQuery(repo1Key, "1", "2", testCase.paginationOffset, testCase.disabledDistinctiveAql)
assert.Equal(t, testCase.expectedAql, results)
})
}
}

var generateDockerManifestAqlQueryTestCases = []struct {
paginationOffset int
disabledDistinctiveAql bool
expectedAql string
}{
{0, false, "items.find({\"$and\":[{\"repo\":\"repo1\"},{\"modified\":{\"$gte\":\"1\"}},{\"modified\":{\"$lt\":\"2\"}},{\"$or\":[{\"name\":\"manifest.json\"},{\"name\":\"list.manifest.json\"}]}]}).include(\"repo\",\"path\",\"name\",\"type\",\"modified\").sort({\"$asc\":[\"name\",\"path\"]}).offset(0).limit(10000)"},
{0, true, "items.find({\"$and\":[{\"repo\":\"repo1\"},{\"modified\":{\"$gte\":\"1\"}},{\"modified\":{\"$lt\":\"2\"}},{\"$or\":[{\"name\":\"manifest.json\"},{\"name\":\"list.manifest.json\"}]}]}).include(\"repo\",\"path\",\"name\",\"type\",\"modified\").sort({\"$asc\":[\"name\",\"path\"]}).offset(0).limit(10000).distinct(false)"},
{2, false, "items.find({\"$and\":[{\"repo\":\"repo1\"},{\"modified\":{\"$gte\":\"1\"}},{\"modified\":{\"$lt\":\"2\"}},{\"$or\":[{\"name\":\"manifest.json\"},{\"name\":\"list.manifest.json\"}]}]}).include(\"repo\",\"path\",\"name\",\"type\",\"modified\").sort({\"$asc\":[\"name\",\"path\"]}).offset(20000).limit(10000)"},
{2, true, "items.find({\"$and\":[{\"repo\":\"repo1\"},{\"modified\":{\"$gte\":\"1\"}},{\"modified\":{\"$lt\":\"2\"}},{\"$or\":[{\"name\":\"manifest.json\"},{\"name\":\"list.manifest.json\"}]}]}).include(\"repo\",\"path\",\"name\",\"type\",\"modified\").sort({\"$asc\":[\"name\",\"path\"]}).offset(20000).limit(10000).distinct(false)"},
}

func TestGenerateDockerManifestAqlQuery(t *testing.T) {
for _, testCase := range generateDockerManifestAqlQueryTestCases {
t.Run("", func(*testing.T) {
results := generateDockerManifestAqlQuery(repo1Key, "1", "2", testCase.paginationOffset, testCase.disabledDistinctiveAql)
assert.Equal(t, testCase.expectedAql, results)
})
}
}
22 changes: 14 additions & 8 deletions artifactory/commands/transferfiles/filesdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (f *filesDiffPhase) getTimeFrameFilesDiff(fromTimestamp, toTimestamp string
}

func (f *filesDiffPhase) getNonDockerTimeFrameFilesDiff(fromTimestamp, toTimestamp string, paginationOffset int) (aqlResult *servicesUtils.AqlSearchResult, err error) {
query := generateDiffAqlQuery(f.repoKey, fromTimestamp, toTimestamp, paginationOffset)
query := generateDiffAqlQuery(f.repoKey, fromTimestamp, toTimestamp, paginationOffset, f.disabledDistinctiveAql)
return runAql(f.context, f.srcRtDetails, query)
}

Expand All @@ -225,7 +225,7 @@ func (f *filesDiffPhase) getNonDockerTimeFrameFilesDiff(fromTimestamp, toTimesta
// to get all artifacts in its path (that includes the "manifest.json" file itself and all its layouts).
func (f *filesDiffPhase) getDockerTimeFrameFilesDiff(fromTimestamp, toTimestamp string, paginationOffset int) (aqlResult *servicesUtils.AqlSearchResult, err error) {
// Get all newly created or modified manifest files ("manifest.json" and "list.manifest.json" files)
query := generateDockerManifestAqlQuery(f.repoKey, fromTimestamp, toTimestamp, paginationOffset)
query := generateDockerManifestAqlQuery(f.repoKey, fromTimestamp, toTimestamp, paginationOffset, f.disabledDistinctiveAql)
manifestFilesResult, err := runAql(f.context, f.srcRtDetails, query)
if err != nil {
return
Expand Down Expand Up @@ -261,11 +261,10 @@ func (f *filesDiffPhase) getDockerTimeFrameFilesDiff(fromTimestamp, toTimestamp
return
}

func generateDiffAqlQuery(repoKey, fromTimestamp, toTimestamp string, paginationOffset int) string {
func generateDiffAqlQuery(repoKey, fromTimestamp, toTimestamp string, paginationOffset int, disabledDistinctiveAql bool) string {
query := fmt.Sprintf(`items.find({"$and":[{"modified":{"$gte":"%s"}},{"modified":{"$lt":"%s"}},{"repo":"%s","type":"any"}]})`, fromTimestamp, toTimestamp, repoKey)
query += `.include("repo","path","name","type","modified","size")`
query += fmt.Sprintf(`.sort({"$asc":["modified"]}).offset(%d).limit(%d)`, paginationOffset*AqlPaginationLimit, AqlPaginationLimit)
return query
return query + generateAqlSortingPart(paginationOffset, disabledDistinctiveAql)
}

// This function generates an AQL that searches for all the content in the list of provided Artifactory paths.
Expand All @@ -283,10 +282,17 @@ func generateGetDirContentAqlQuery(repoKey string, paths []string) string {
}

// This function generates an AQL that searches for all files named "manifest.json" and "list.manifest.json" in a specific repository.
func generateDockerManifestAqlQuery(repoKey, fromTimestamp, toTimestamp string, paginationOffset int) string {
func generateDockerManifestAqlQuery(repoKey, fromTimestamp, toTimestamp string, paginationOffset int, disabledDistinctiveAql bool) string {
query := `items.find({"$and":`
query += fmt.Sprintf(`[{"repo":"%s"},{"modified":{"$gte":"%s"}},{"modified":{"$lt":"%s"}},{"$or":[{"name":"manifest.json"},{"name":"list.manifest.json"}]}`, repoKey, fromTimestamp, toTimestamp)
query += `]}).include("repo","path","name","type","modified")`
query += fmt.Sprintf(`.sort({"$asc":["modified"]}).offset(%d).limit(%d)`, paginationOffset*AqlPaginationLimit, AqlPaginationLimit)
return query
return query + generateAqlSortingPart(paginationOffset, disabledDistinctiveAql)
}

func generateAqlSortingPart(paginationOffset int, disabledDistinctiveAql bool) string {
sortingPart := fmt.Sprintf(`.sort({"$asc":["name","path"]}).offset(%d).limit(%d)`, paginationOffset*AqlPaginationLimit, AqlPaginationLimit)
if disabledDistinctiveAql {
sortingPart += `.distinct(false)`
}
return sortingPart
}
7 changes: 5 additions & 2 deletions artifactory/commands/transferfiles/fulltransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func getFolderRelativePath(folderName, relativeLocation string) string {
}

func (m *fullTransferPhase) getDirectoryContentAql(relativePath string, paginationOffset int) (result []servicesUtils.ResultItem, lastPage bool, err error) {
query := generateFolderContentAqlQuery(m.repoKey, relativePath, paginationOffset)
query := generateFolderContentAqlQuery(m.repoKey, relativePath, paginationOffset, m.disabledDistinctiveAql)
aqlResults, err := runAql(m.context, m.srcRtDetails, query)
if err != nil {
return []servicesUtils.ResultItem{}, false, err
Expand All @@ -284,10 +284,13 @@ func (m *fullTransferPhase) getDirectoryContentAql(relativePath string, paginati
return
}

func generateFolderContentAqlQuery(repoKey, relativePath string, paginationOffset int) string {
func generateFolderContentAqlQuery(repoKey, relativePath string, paginationOffset int, disabledDistinctiveAql bool) string {
query := fmt.Sprintf(`items.find({"type":"any","$or":[{"$and":[{"repo":"%s","path":{"$match":"%s"},"name":{"$match":"*"}}]}]})`, repoKey, relativePath)
query += `.include("repo","path","name","type","size")`
query += fmt.Sprintf(`.sort({"$asc":["name"]}).offset(%d).limit(%d)`, paginationOffset*AqlPaginationLimit, AqlPaginationLimit)
if disabledDistinctiveAql {
query += `.distinct(false)`
}
return query
}

Expand Down
6 changes: 3 additions & 3 deletions artifactory/commands/transferfiles/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ type producerConsumerWrapper struct {
}

func newProducerConsumerWrapper() producerConsumerWrapper {
chunkUploaderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false)
chunkBuilderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false)
chunkUploaderProducerConsumer := parallel.NewRunner(GetChunkUploaderThreads(), tasksMaxCapacity, false)
chunkBuilderProducerConsumer := parallel.NewRunner(GetChunkBuilderThreads(), tasksMaxCapacity, false)
chunkUploaderProducerConsumer.SetFinishedNotification(true)
chunkBuilderProducerConsumer.SetFinishedNotification(true)
errorsQueue := clientUtils.NewErrorsQueue(1)
Expand Down Expand Up @@ -310,7 +310,7 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa

// Fill chunk data batch till full. Return if no new chunk data is available.
func fillChunkDataBatch(chunksLifeCycleManager *ChunksLifeCycleManager, uploadChunkChan chan UploadedChunk) {
for chunksLifeCycleManager.totalChunks < GetThreads() {
for chunksLifeCycleManager.totalChunks < GetChunkUploaderThreads() {
select {
case data := <-uploadChunkChan:
currentNodeId := nodeId(data.NodeId)
Expand Down
7 changes: 7 additions & 0 deletions artifactory/commands/transferfiles/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type transferPhase interface {
setProxyKey(proxyKey string)
setBuildInfo(setBuildInfo bool)
setPackageType(packageType string)
setDisabledDistinctiveAql()
setStopSignal(stopSignal chan os.Signal)
StopGracefully()
}
Expand All @@ -59,6 +60,8 @@ type phaseBase struct {
stateManager *state.TransferStateManager
locallyGeneratedFilter *locallyGeneratedFilter
stopSignal chan os.Signal
// Optimization in Artifactory version 7.37 and above enables the exclusion of setting DISTINCT in SQL queries
disabledDistinctiveAql bool
}

func (pb *phaseBase) ShouldStop() bool {
Expand Down Expand Up @@ -140,6 +143,10 @@ func (pb *phaseBase) setPackageType(packageType string) {
pb.packageType = packageType
}

func (pb *phaseBase) setDisabledDistinctiveAql() {
pb.disabledDistinctiveAql = true
}

func (pb *phaseBase) setStopSignal(stopSignal chan os.Signal) {
pb.stopSignal = stopSignal
}
Expand Down
42 changes: 38 additions & 4 deletions artifactory/commands/transferfiles/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"syscall"
"time"

"github.com/jfrog/gofrog/version"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/state"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/utils/precheckrunner"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/utils"
Expand All @@ -33,6 +34,7 @@ const (
retries = 600
retriesWaitMilliSecs = 5000
dataTransferPluginMinVersion = "1.7.0"
disableDistinctAqlMinVersion = "7.37"
)

type TransferFilesCommand struct {
Expand All @@ -55,6 +57,8 @@ type TransferFilesCommand struct {
stateManager *state.TransferStateManager
preChecks bool
locallyGeneratedFilter *locallyGeneratedFilter
// Optimization in Artifactory version 7.37 and above enables the exclusion of setting DISTINCT in SQL queries
disabledDistinctiveAql bool
}

func NewTransferFilesCommand(sourceServer, targetServer *config.ServerDetails) (*TransferFilesCommand, error) {
Expand Down Expand Up @@ -182,6 +186,10 @@ func (tdc *TransferFilesCommand) Run() (err error) {
return err
}

if err = tdc.initDistinctAql(); err != nil {
return err
}

if err = tdc.initStateManager(allSourceLocalRepos, sourceBuildInfoRepos); err != nil {
return err
}
Expand Down Expand Up @@ -291,6 +299,28 @@ func (tdc *TransferFilesCommand) initStorageInfoManagers() error {
return storageInfoManager.CalculateStorageInfo()
}

func (tdc *TransferFilesCommand) initDistinctAql() error {
// Init source storage services manager
servicesManager, err := createTransferServiceManager(tdc.context, tdc.sourceServerDetails)
if err != nil {
return err
}

// Getting source Artifactory version
sourceArtifactoryVersion, err := servicesManager.GetVersion()
if err != nil {
return err
}

// If version is at least 7.37, add .distinct(false) to AQL queries
if version.NewVersion(sourceArtifactoryVersion).AtLeast(disableDistinctAqlMinVersion) {
tdc.disabledDistinctiveAql = true
log.Debug(fmt.Sprintf("The source Artifactory version is above %s (%s). Adding .distinct(false) to AQL requests.",
disableDistinctAqlMinVersion, sourceArtifactoryVersion))
}
return nil
}

// Creates the Pre-checks runner for the data transfer command
func (tdc *TransferFilesCommand) NewTransferDataPreChecksRunner() (runner *precheckrunner.PreCheckRunner, err error) {
// Get relevant repos
Expand Down Expand Up @@ -458,6 +488,9 @@ func (tdc *TransferFilesCommand) startPhase(newPhase *transferPhase, repo string
if err != nil {
return err
}
if tdc.disabledDistinctiveAql {
(*newPhase).setDisabledDistinctiveAql()
}
printPhaseChange("Running '" + (*newPhase).getPhaseName() + "' for repo '" + repo + "'...")
err = (*newPhase).run()
if err != nil {
Expand Down Expand Up @@ -555,19 +588,20 @@ func (tdc *TransferFilesCommand) getAllLocalRepos(serverDetails *config.ServerDe

func (tdc *TransferFilesCommand) initCurThreads(buildInfoRepo bool) error {
// Use default threads if settings file doesn't exist or an error occurred.
curThreads = utils.DefaultThreads
curChunkUploaderThreads = utils.DefaultThreads
curChunkBuilderThreads = utils.DefaultThreads
settings, err := utils.LoadTransferSettings()
if err != nil {
return err
}
if settings != nil {
curThreads = settings.CalcNumberOfThreads(buildInfoRepo)
if buildInfoRepo && curThreads < settings.ThreadsNumber {
curChunkBuilderThreads, curChunkUploaderThreads = settings.CalcNumberOfThreads(buildInfoRepo)
if buildInfoRepo && curChunkUploaderThreads < settings.ThreadsNumber {
log.Info("Build info transferring - using reduced number of threads")
}
}

log.Info("Running with maximum", strconv.Itoa(curThreads), "working threads...")
log.Info("Running with maximum", strconv.Itoa(curChunkUploaderThreads), "working threads...")
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion artifactory/commands/transferfiles/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ func TestUploadChunkAndPollUploads(t *testing.T) {

// Sends chunk to upload, polls on chunk three times - once when it is still in progress, once after done received and once to notify back to the source.
func uploadChunkAndPollTwice(t *testing.T, phaseBase *phaseBase, fileSample api.FileRepresentation) {
curThreads = 8
curChunkUploaderThreads = coreUtils.DefaultThreads
curChunkBuilderThreads = coreUtils.DefaultThreads
uploadChunksChan := make(chan UploadedChunk, 3)
doneChan := make(chan bool, 1)
var runWaitGroup sync.WaitGroup
Expand Down
31 changes: 20 additions & 11 deletions artifactory/commands/transferfiles/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type (
)

var AqlPaginationLimit = DefaultAqlPaginationLimit
var curThreads int
var curChunkBuilderThreads int
var curChunkUploaderThreads int

type UploadedChunk struct {
api.UploadChunkResponse
Expand Down Expand Up @@ -190,7 +191,7 @@ var processedUploadChunksMutex sync.Mutex
func incrCurProcessedChunksWhenPossible() bool {
processedUploadChunksMutex.Lock()
defer processedUploadChunksMutex.Unlock()
if curProcessedUploadChunks < GetThreads() {
if curProcessedUploadChunks < GetChunkUploaderThreads() {
curProcessedUploadChunks++
return true
}
Expand Down Expand Up @@ -318,8 +319,12 @@ func newUploadedChunkStruct(uploadChunkResponse api.UploadChunkResponse, chunk a
}
}

func GetThreads() int {
return curThreads
func GetChunkBuilderThreads() int {
return curChunkBuilderThreads
}

func GetChunkUploaderThreads() int {
return curChunkUploaderThreads
}

// Periodically reads settings file and updates the number of threads.
Expand Down Expand Up @@ -349,16 +354,20 @@ func updateThreads(pcWrapper *producerConsumerWrapper, buildInfoRepo bool) error
if err != nil || settings == nil {
return err
}
calculatedNumberOfThreads := settings.CalcNumberOfThreads(buildInfoRepo)
if curThreads != calculatedNumberOfThreads {
calculatedChunkBuilderThreads, calculatedChunkUploaderThreads := settings.CalcNumberOfThreads(buildInfoRepo)
if curChunkUploaderThreads != calculatedChunkUploaderThreads {
if pcWrapper != nil {
updateProducerConsumerMaxParallel(pcWrapper.chunkBuilderProducerConsumer, calculatedNumberOfThreads)
updateProducerConsumerMaxParallel(pcWrapper.chunkUploaderProducerConsumer, calculatedNumberOfThreads)
if curChunkBuilderThreads != calculatedChunkBuilderThreads {
updateProducerConsumerMaxParallel(pcWrapper.chunkBuilderProducerConsumer, calculatedChunkBuilderThreads)
}
updateProducerConsumerMaxParallel(pcWrapper.chunkUploaderProducerConsumer, calculatedChunkUploaderThreads)
}
log.Info(fmt.Sprintf("Number of threads have been updated to %s (was %s).", strconv.Itoa(calculatedNumberOfThreads), strconv.Itoa(curThreads)))
curThreads = calculatedNumberOfThreads
log.Info(fmt.Sprintf("Number of threads has been updated to %s (was %s).", strconv.Itoa(calculatedChunkUploaderThreads), strconv.Itoa(curChunkUploaderThreads)))
curChunkBuilderThreads = calculatedChunkBuilderThreads
curChunkUploaderThreads = calculatedChunkUploaderThreads
} else {
log.Debug("No change to the number of threads have been detected.")
log.Debug(fmt.Sprintf("No change to the number of threads has been detected. Max chunks builder threads: %d. Max chunks uploader threads: %d.",
calculatedChunkBuilderThreads, calculatedChunkUploaderThreads))
}
return nil
}
Expand Down
Loading

0 comments on commit 43e840d

Please sign in to comment.