Skip to content

Commit

Permalink
Refactor downloader code
Browse files Browse the repository at this point in the history
Implement proper parallel downloader #30
and partial transfers downloader #7
  • Loading branch information
anatol committed Jul 21, 2023
1 parent 944cab8 commit 2afc0da
Show file tree
Hide file tree
Showing 8 changed files with 695 additions and 266 deletions.
438 changes: 438 additions & 0 deletions downloader.go

Large diffs are not rendered by default.

106 changes: 106 additions & 0 deletions downloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"context"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"
"time"
)

func TestParallelDownload(t *testing.T) {
// Setup an upstream repo
handler := func(w http.ResponseWriter, r *http.Request) {
out := fmt.Sprintf("This is a sample content for %s", r.URL.Path)

w.Header().Add("Last-Modified", time.Now().Format(http.TimeFormat))
w.Header().Add("Content-Length", fmt.Sprintf("%d", len(out)))

time.Sleep(time.Second) // simulate a slow network
w.Write([]byte(out))
}
mux := http.NewServeMux()
mux.HandleFunc("/myrepo/", handler)

srv := &http.Server{Addr: ":0", Handler: mux}
ln, err := net.Listen("tcp", srv.Addr)
if err != nil {
t.Fatal(err)
}
go srv.Serve(ln)
defer srv.Shutdown(context.Background())

// setup a pacoloco proxy
testPacolocoDir, err := os.MkdirTemp(os.TempDir(), "*-pacoloco-repo")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testPacolocoDir)

repo := &Repo{
URL: fmt.Sprintf("http://localhost:%d/myrepo", ln.Addr().(*net.TCPAddr).Port),
}

config = &Config{
CacheDir: testPacolocoDir,
Port: -1,
PurgeFilesAfter: -1,
DownloadTimeout: 999,
Repos: map[string]*Repo{"up": repo},
}

files := []string{
"foobar-3.3.6-7-x86_64.pkg.tar.zst",
"bar-222.pkg.tar.zst",
"linux-5.19.pkg.tar.zst",
"hello-5.19.pkg.tar.zst",
"gcc-3.pkg.tar.zst",
}

const num = 300
counter := sync.WaitGroup{}
counter.Add(num)

for i := 0; i < num; i++ {
go func() {
f := files[rand.Int()%len(files)]
content := "This is a sample content for /myrepo/" + f

req := httptest.NewRequest(http.MethodGet, "/repo/up/"+f, nil)

// half of requests will have a byte-range set
if rand.Int()%2 == 0 {
start := rand.Int() % (len(content) - 5)
end := start + rand.Int()%(len(content)-start-1) + 1

content = content[start : end+1]
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", start, end))
}

w := httptest.NewRecorder()
if err := handleRequest(w, req); err != nil {
t.Error(err)
}
res := w.Result()
defer res.Body.Close()
data, err := io.ReadAll(res.Body)
if err != nil {
t.Errorf("expected error to be nil got %v", err)
}
if string(data) != content {
t.Errorf("expected '%s' got '%s'", content, string(data))
}
counter.Done()
}()
}

// goroutine for randomly dropping cache files

counter.Wait()
}
86 changes: 83 additions & 3 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestPacolocoIntegrationWithPrefetching(t *testing.T) {
t.Run("testRequestNonExistingDb", testRequestNonExistingDb)
t.Run("testRequestExistingRepo", testRequestExistingRepo)
t.Run("testRequestExistingRepoWithDb", testRequestExistingRepoWithDb)
t.Run("testRequestDbMultipleTimes", testRequestDbMultipleTimes)
t.Run("testRequestPackageFile", testRequestPackageFile)
t.Run("testFailover", testFailover)
if _, err := os.Stat(path.Join(testPacolocoDir, DefaultDBName)); os.IsNotExist(err) {
Expand Down Expand Up @@ -104,6 +105,7 @@ func TestPacolocoIntegration(t *testing.T) {
t.Run("testRequestNonExistingDb", testRequestNonExistingDb)
t.Run("testRequestExistingRepo", testRequestExistingRepo)
t.Run("testRequestExistingRepoWithDb", testRequestExistingRepoWithDb)
t.Run("testRequestDbMultipleTimes", testRequestDbMultipleTimes)
t.Run("testRequestPackageFile", testRequestPackageFile)
t.Run("testFailover", testFailover)
if _, err := os.Stat(path.Join(testPacolocoDir, DefaultDBName)); !os.IsNotExist(err) {
Expand All @@ -117,7 +119,7 @@ func testInvalidURL(t *testing.T) {
pacolocoHandler(w, req)
resp := w.Result()
if resp.StatusCode != 404 {
t.Error("404 response expected")
t.Errorf("404 response expected, got %v", resp.StatusCode)
}
}

Expand All @@ -128,7 +130,7 @@ func testRequestNonExistingDb(t *testing.T) {
pacolocoHandler(w, req)
resp := w.Result()
if resp.StatusCode != 404 {
t.Error("404 response expected")
t.Errorf("404 response expected, got %v", resp.StatusCode)
}

// check that no repo cached
Expand Down Expand Up @@ -169,7 +171,7 @@ func testRequestExistingRepo(t *testing.T) {
pacolocoHandler(w, req)
resp := w.Result()
if resp.StatusCode != 404 {
t.Error("404 response expected")
t.Errorf("404 response expected, got %v", resp.StatusCode)
}

actualRequests := testutil.ToFloat64(requestCounter)
Expand Down Expand Up @@ -267,6 +269,10 @@ func testRequestExistingRepoWithDb(t *testing.T) {
w.Header().Get("Last-Modified"))
}

// copying a file to server cache is operation that runs asynchronously to downloading from server
// wait a bit until cache operations settle down
time.Sleep(10 * time.Millisecond)

actualRequests := testutil.ToFloat64(requestCounter)
actualServed := testutil.ToFloat64(servedCounter)
actualMissed := testutil.ToFloat64(missedCounter)
Expand Down Expand Up @@ -328,6 +334,10 @@ func testRequestExistingRepoWithDb(t *testing.T) {
t.Errorf("Pacoloco cached incorrect db content: %v", string(content))
}

// copying a file to server cache is operation that runs asynchronously to downloading from server
// wait a bit until cache operations settle down
time.Sleep(10 * time.Millisecond)

actualRequests = testutil.ToFloat64(requestCounter)
actualServed = testutil.ToFloat64(servedCounter)
actualMissed = testutil.ToFloat64(missedCounter)
Expand Down Expand Up @@ -453,6 +463,9 @@ func testRequestPackageFile(t *testing.T) {
expectedModTime,
w.Header().Get("Last-Modified"))
}
// copying a file to server cache is operation that runs asynchronously to downloading from server
// wait a bit until cache operations settle down
time.Sleep(10 * time.Millisecond)

actualRequests := testutil.ToFloat64(requestCounter)
actualServed := testutil.ToFloat64(servedCounter)
Expand Down Expand Up @@ -536,6 +549,73 @@ func testRequestPackageFile(t *testing.T) {
}
}

func testRequestDbMultipleTimes(t *testing.T) {
// Requesting existing repo
repo4 := &Repo{
URL: mirrorURL + "/mirror4",
}
config.Repos["repo4"] = repo4
defer delete(config.Repos, "repo4")

if err := os.Mkdir(path.Join(mirrorDir, "mirror4"), os.ModePerm); err != nil {
t.Fatal(err)
}
defer os.RemoveAll(path.Join(mirrorDir, "mirror4"))

dbAtMirror := path.Join(mirrorDir, "mirror4", "test.db")
dbFileContent := "pacoloco/mirror4.db"

if err := os.WriteFile(dbAtMirror, []byte(dbFileContent), os.ModePerm); err != nil {
t.Fatal(err)
}

req := httptest.NewRequest("GET", pacolocoURL+"/repo/repo4/test.db", nil)
w := httptest.NewRecorder()
pacolocoHandler(w, req)
resp := w.Result()
if resp.StatusCode != 200 {
t.Errorf("200 response expected, got %v", resp.StatusCode)
}
content, err := io.ReadAll(w.Body)
if err != nil {
t.Fatal(err)
}
if string(content) != dbFileContent {
t.Errorf("Pacoloco cached incorrect db content: %v", string(content))
}
if resp.ContentLength != int64(len(dbFileContent)) {
t.Errorf("Pacoloco returns incorrect length %v", resp.ContentLength)
}

// copying a file to server cache is operation that runs asynchronously to downloading from server
// wait a bit until cache operations settle down
// time.Sleep(10 * time.Millisecond)

// check that repo is cached
if _, err = os.Stat(path.Join(testPacolocoDir, "pkgs", "repo4")); os.IsNotExist(err) {
t.Error("repo4 repo should be cached")
}
defer os.RemoveAll(path.Join(testPacolocoDir, "pkgs", "repo4"))

req2 := httptest.NewRequest("GET", pacolocoURL+"/repo/repo4/test.db", nil)
w2 := httptest.NewRecorder()
pacolocoHandler(w2, req2)
resp2 := w2.Result()
if resp2.StatusCode != 200 {
t.Errorf("200 response expected, got %v", resp2.StatusCode)
}
content2, err := io.ReadAll(w2.Body)
if err != nil {
t.Fatal(err)
}
if string(content2) != dbFileContent {
t.Errorf("Pacoloco cached incorrect db content: %v", string(content2))
}
if resp2.ContentLength != int64(len(dbFileContent)) {
t.Errorf("Pacoloco returns incorrect length %v", resp2.ContentLength)
}
}

func testFailover(t *testing.T) {
failover := &Repo{
URLs: []string{
Expand Down
Loading

0 comments on commit 2afc0da

Please sign in to comment.