Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelise upload to get better upload speeds #6

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

cli:
@go build -v -o bin/winrmcp
@go build -v -o bin/winrmcp.exe

.PHONY: cli
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func runMain() error {
cacert := flags.String("cacert", "", "ca certificate to validate against")
opTimeout := flags.Duration("op-timeout", time.Second*60, "operation timeout")
maxOpsPerShell := flags.Int("max-ops-per-shell", 15, "max operations per shell")
maxShells := flags.Int("max-shells", 5, "max shells per user")
flags.Parse(os.Args[1:])

var certBytes []byte
Expand All @@ -72,6 +73,7 @@ func runMain() error {
CACertBytes: certBytes,
OperationTimeout: *opTimeout,
MaxOperationsPerShell: *maxOpsPerShell,
MaxShells: *maxShells,
})
if err != nil {
return err
Expand Down
154 changes: 107 additions & 47 deletions winrmcp/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func doCopy(client *winrm.Client, config *Config, in io.Reader, toPath string) e
log.Printf("Copying file to %s\n", tempPath)
}

err := uploadContent(client, config.MaxOperationsPerShell, "%TEMP%\\"+tempFile, in)
err := uploadContent(client, config.MaxShells, config.MaxOperationsPerShell, "%TEMP%\\"+tempFile, in)
if err != nil {
return errors.New(fmt.Sprintf("Error uploading file to %s: %v", tempPath, err))
}
Expand All @@ -30,7 +30,7 @@ func doCopy(client *winrm.Client, config *Config, in io.Reader, toPath string) e
log.Printf("Moving file from %s to %s", tempPath, toPath)
}

err = restoreContent(client, tempPath, toPath)
err = restoreContent(client, tempFile, toPath)
if err != nil {
return errors.New(fmt.Sprintf("Error restoring file from %s to %s: %v", tempPath, toPath, err))
}
Expand All @@ -39,34 +39,80 @@ func doCopy(client *winrm.Client, config *Config, in io.Reader, toPath string) e
log.Printf("Removing temporary file %s", tempPath)
}

err = cleanupContent(client, tempPath)
err = cleanupContent(client, fmt.Sprintf("%s.tmp", toPath))
if err != nil {
return errors.New(fmt.Sprintf("Error removing temporary file %s: %v", tempPath, err))
}

return nil
}

func uploadContent(client *winrm.Client, maxChunks int, filePath string, reader io.Reader) error {
func uploadContent(client *winrm.Client, maxShell int, maxChunks int, filePath string, reader io.Reader) error {
var err error
done := false
for !done {
done, err = uploadChunks(client, filePath, maxChunks, reader)
if err != nil {
return err
}
}
var piece = 0
var wg sync.WaitGroup

return nil
}
if maxChunks == 0 {
maxChunks = 1
}

func uploadChunks(client *winrm.Client, filePath string, maxChunks int, reader io.Reader) (bool, error) {
shell, err := client.CreateShell()
if err != nil {
return false, errors.New(fmt.Sprintf("Couldn't create shell: %v", err))
if maxShell == 0 {
maxShell = 5
}
defer shell.Close()

// Create 4 Parallel workers
for p := 0; p < maxShell; p++ {
done := make(chan bool, 1)
// Add worker to the WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
Loop:
for {
select {
case <-done:
break Loop
default:
// Create a shell
shell, gerr := client.CreateShell()
if gerr != nil {
err = gerr
break Loop
}
defer shell.Close()

// Each shell can do X amount of chunks per session
for c := 0; c < maxChunks; c++ {
// Read a chunk
piece++
content, finished, gerr := getChunk(reader, filePath)
if gerr != nil {
err = gerr
done <- true
shell.Close()
break
}
if finished {
done <- true
shell.Close()
break
}

gerr = uploadChunks(shell, fmt.Sprintf("%v.%v", filePath, piece), content)
if gerr != nil {
err = gerr
done <- true
}
}
shell.Close()
}
}
}()
}
wg.Wait()
return err
}
func getChunk(reader io.Reader, filePath string) (string, bool, error) {
// Upload the file in chunks to get around the Windows command line size limit.
// Base64 encodes each set of three bytes into four bytes. In addition the output
// is padded to always be a multiple of four.
Expand All @@ -81,39 +127,37 @@ func uploadChunks(client *winrm.Client, filePath string, maxChunks int, reader i
chunkSize := ((8000 - len(filePath)) / 4) * 3
chunk := make([]byte, chunkSize)

if maxChunks == 0 {
maxChunks = 1
n, err := reader.Read(chunk)
if err != nil && err != io.EOF {
return "", false, err
}
if n == 0 {
return "", true, nil
}

for i := 0; i < maxChunks; i++ {
n, err := reader.Read(chunk)
content := base64.StdEncoding.EncodeToString(chunk[:n])

if err != nil && err != io.EOF {
return false, err
}
if n == 0 {
return true, nil
}
return content, false, nil

content := base64.StdEncoding.EncodeToString(chunk[:n])
if err = appendContent(shell, filePath, content); err != nil {
return false, err
}
}
}

return false, nil
func uploadChunks(shell *winrm.Shell, filePath string, content string) error {
// Upload chunk
err := appendContent(shell, filePath, content)
return err
}

func restoreContent(client *winrm.Client, fromPath, toPath string) error {
func restoreContent(client *winrm.Client, fileLike, toPath string) error {
shell, err := client.CreateShell()
if err != nil {
return err
}

defer shell.Close()
script := fmt.Sprintf(`
$tmp_file_path = [System.IO.Path]::GetFullPath("%s")
Write-Host ""
$dest_file_path = [System.IO.Path]::GetFullPath("%s")
$dest_file_path_temp = [System.IO.Path]::GetFullPath("$dest_file_path.tmp")
if (Test-Path $dest_file_path) {
rm $dest_file_path
}
Expand All @@ -122,15 +166,32 @@ func restoreContent(client *winrm.Client, fromPath, toPath string) error {
New-Item -ItemType directory -Force -ErrorAction SilentlyContinue -Path $dest_dir | Out-Null
}

if (Test-Path $tmp_file_path) {
$base64_lines = Get-Content $tmp_file_path
$base64_string = [string]::join("",$base64_lines)
$bytes = [System.Convert]::FromBase64String($base64_string)
[System.IO.File]::WriteAllBytes($dest_file_path, $bytes)
} else {
echo $null > $dest_file_path
$file_list = Get-ChildItem $env:TEMP |
where {$_.Name -like "%s*"}

# Get the number from the last part of the file and sort
$file_list | foreach {
$_ | Add-Member -Name Number -MemberType NoteProperty -Value -1
$_.Number = [int]$_.Name.Substring($_.Name.IndexOf("tmp.")+4)
}
`, fromPath, toPath)
$file_list = $file_list | sort { $_.Number }

if (Test-Path $dest_file_path_temp) {
rm $dest_file_path_temp
}
# For each file in the list, add it to a main file
$file_list | foreach {
$tmp_file_path = [System.IO.Path]::GetFullPath($_.FullName)
$content = Get-Content $tmp_file_path
Add-Content -Path $dest_file_path_temp -Value $content
rm $tmp_file_path
}

$base64_lines = Get-Content $dest_file_path_temp
$base64_string = [string]::join("",$base64_lines)
$bytes = [System.Convert]::FromBase64String($base64_string)
[System.IO.File]::WriteAllBytes($dest_file_path, $bytes)
`, toPath, fileLike)

cmd, err := shell.Execute(winrm.Powershell(script))
if err != nil {
Expand All @@ -157,15 +218,14 @@ func restoreContent(client *winrm.Client, fromPath, toPath string) error {
return nil
}

func cleanupContent(client *winrm.Client, filePath string) error {
func cleanupContent(client *winrm.Client, toPath string) error {
shell, err := client.CreateShell()
if err != nil {
return err
}

defer shell.Close()
cmd, _ := shell.Execute("powershell", "Remove-Item", filePath, "-ErrorAction SilentlyContinue")

cmd, _ := shell.Execute("powershell", "Remove-Item", toPath, "-ErrorAction SilentlyContinue")
cmd.Wait()
cmd.Close()
return nil
Expand Down
16 changes: 8 additions & 8 deletions winrmcp/winrmcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Config struct {
CACertBytes []byte
OperationTimeout time.Duration
MaxOperationsPerShell int
MaxShells int
}

type Auth struct {
Expand Down Expand Up @@ -63,15 +64,14 @@ func (fs *Winrmcp) Copy(fromPath, toPath string) error {

if !fi.IsDir() {
return fs.Write(toPath, f)
} else {
fw := fileWalker{
client: fs.client,
config: fs.config,
toDir: toPath,
fromDir: fromPath,
}
return filepath.Walk(fromPath, fw.copyFile)
}
fw := fileWalker{
client: fs.client,
config: fs.config,
toDir: toPath,
fromDir: fromPath,
}
return filepath.Walk(fromPath, fw.copyFile)
}

func (fs *Winrmcp) Write(toPath string, src io.Reader) error {
Expand Down