Skip to content

Commit

Permalink
Use scratch space for non-archive uploads (with host assisted from bl…
Browse files Browse the repository at this point in the history
…ock exception)

Basically a follow up to 3219 for upload sources,
which suffer from the same issue of losing sparseness.

Signed-off-by: Alex Kalenyuk <[email protected]>
  • Loading branch information
akalenyu committed May 2, 2024
1 parent 50bcbb5 commit ad75665
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 31 deletions.
9 changes: 5 additions & 4 deletions cmd/cdi-cloner/clone-source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/x509"
"errors"
"flag"
"fmt"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (er *execReader) Close() error {
}

func init() {
flag.StringVar(&contentType, "content-type", "", "filesystem-clone|blockdevice-clone")
flag.StringVar(&contentType, "content-type", "", fmt.Sprintf("%s|%s", common.FilesystemCloneContentType, common.BlockdeviceClone))
flag.StringVar(&mountPoint, "mount", "", "pvc mount point")
flag.Uint64Var(&uploadBytes, "upload-bytes", 0, "approx number of bytes in input")
klog.InitFlags(nil)
Expand Down Expand Up @@ -138,7 +139,7 @@ func pipeToSnappy(reader io.ReadCloser) io.ReadCloser {

func validateContentType() {
switch contentType {
case "filesystem-clone", "blockdevice-clone":
case common.FilesystemCloneContentType, common.BlockdeviceClone:
default:
klog.Fatalf("Invalid content-type %q", contentType)
}
Expand Down Expand Up @@ -202,13 +203,13 @@ func newTarReader(preallocation bool) (io.ReadCloser, error) {

func getInputStream(preallocation bool) io.ReadCloser {
switch contentType {
case "filesystem-clone":
case common.FilesystemCloneContentType:
rc, err := newTarReader(preallocation)
if err != nil {
klog.Fatalf("Error creating tar reader for %q: %+v", mountPoint, err)
}
return rc
case "blockdevice-clone":
case common.BlockdeviceClone:
rc, err := os.Open(mountPoint)
if err != nil {
klog.Fatalf("Error opening block device %q: %+v", mountPoint, err)
Expand Down
12 changes: 7 additions & 5 deletions pkg/importer/upload-datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ type UploadDataSource struct {
url *url.URL
// contentType expected from the upload content
contentType cdiv1.DataVolumeContentType
// directWriteException indicates an exception to write directly without scratch space
directWriteException bool
}

// NewUploadDataSource creates a new instance of an UploadDataSource
func NewUploadDataSource(stream io.ReadCloser, contentType cdiv1.DataVolumeContentType) *UploadDataSource {
func NewUploadDataSource(stream io.ReadCloser, contentType cdiv1.DataVolumeContentType, directWriteException bool) *UploadDataSource {
return &UploadDataSource{
stream: stream,
contentType: contentType,
stream: stream,
contentType: contentType,
directWriteException: directWriteException,
}
}

Expand All @@ -51,8 +54,7 @@ func (ud *UploadDataSource) Info() (ProcessingPhase, error) {
if ud.contentType == cdiv1.DataVolumeArchive {
return ProcessingPhaseTransferDataDir, nil
}
if !ud.readers.Convert {
// Uploading a raw file, we can write that directly to the target.
if ud.directWriteException {
return ProcessingPhaseTransferDataFile, nil
}
return ProcessingPhaseTransferScratch, nil
Expand Down
34 changes: 17 additions & 17 deletions pkg/importer/upload-datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var _ = Describe("Upload data source", func() {
Expect(err).NotTo(HaveOccurred())
err = file.Close()
Expect(err).NotTo(HaveOccurred())
ud = NewUploadDataSource(file, dvKubevirt)
ud = NewUploadDataSource(file, dvKubevirt, false)
result, err := ud.Info()
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
Expand All @@ -53,7 +53,7 @@ var _ = Describe("Upload data source", func() {
// Don't need to defer close, since ud.Close will close the reader
file, err := os.Open(cirrosFilePath)
Expect(err).NotTo(HaveOccurred())
ud = NewUploadDataSource(file, dvKubevirt)
ud = NewUploadDataSource(file, dvKubevirt, false)
result, err := ud.Info()

Expect(err).NotTo(HaveOccurred())
Expand All @@ -64,21 +64,21 @@ var _ = Describe("Upload data source", func() {
// Don't need to defer close, since ud.Close will close the reader
file, err := os.Open(tinyCoreTarFilePath)
Expect(err).NotTo(HaveOccurred())
ud = NewUploadDataSource(file, dvArchive)
ud = NewUploadDataSource(file, dvArchive, false)
result, err := ud.Info()

Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataDir).To(Equal(result))
})

It("Info should return TransferData, when passed in a valid raw image", func() {
It("Info should return TransferScratch, when passed in a valid raw image", func() {
// Don't need to defer close, since ud.Close will close the reader
file, err := os.Open(tinyCoreFilePath)
Expect(err).NotTo(HaveOccurred())
ud = NewUploadDataSource(file, dvKubevirt)
ud = NewUploadDataSource(file, dvKubevirt, false)
result, err := ud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
})

DescribeTable("calling transfer should", func(fileName string, dvContentType cdiv1.DataVolumeContentType, expectedPhase ProcessingPhase, scratchPath string, want []byte, wantErr bool) {
Expand All @@ -88,7 +88,7 @@ var _ = Describe("Upload data source", func() {
sourceFile, err := os.Open(fileName)
Expect(err).NotTo(HaveOccurred())

ud = NewUploadDataSource(sourceFile, dvContentType)
ud = NewUploadDataSource(sourceFile, dvContentType, false)
_, err = ud.Info()
Expect(err).NotTo(HaveOccurred())
nextPhase, err := ud.Transfer(scratchPath)
Expand Down Expand Up @@ -118,7 +118,7 @@ var _ = Describe("Upload data source", func() {
sourceFile, err := os.Open(cirrosFilePath)
Expect(err).NotTo(HaveOccurred())

ud = NewUploadDataSource(sourceFile, dvKubevirt)
ud = NewUploadDataSource(sourceFile, dvKubevirt, false)
nextPhase, err := ud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(nextPhase))
Expand All @@ -133,10 +133,10 @@ var _ = Describe("Upload data source", func() {
// Don't need to defer close, since ud.Close will close the reader
sourceFile, err := os.Open(tinyCoreFilePath)
Expect(err).NotTo(HaveOccurred())
ud = NewUploadDataSource(sourceFile, dvKubevirt)
ud = NewUploadDataSource(sourceFile, dvKubevirt, false)
result, err := ud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = ud.TransferFile(filepath.Join(tmpDir, "file"))
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
Expand All @@ -146,17 +146,17 @@ var _ = Describe("Upload data source", func() {
// Don't need to defer close, since ud.Close will close the reader
sourceFile, err := os.Open(tinyCoreFilePath)
Expect(err).NotTo(HaveOccurred())
ud = NewUploadDataSource(sourceFile, dvKubevirt)
ud = NewUploadDataSource(sourceFile, dvKubevirt, false)
result, err := ud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = ud.TransferFile("/invalidpath/invalidfile")
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})

It("Close with nil stream should not fail", func() {
ud = NewUploadDataSource(nil, dvKubevirt)
ud = NewUploadDataSource(nil, dvKubevirt, false)
err := ud.Close()
Expect(err).NotTo(HaveOccurred())
})
Expand Down Expand Up @@ -204,14 +204,14 @@ var _ = Describe("Async Upload data source", func() {
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
})

It("Info should return TransferData, when passed in a valid raw image", func() {
It("Info should return TransferScratch, when passed in a valid raw image", func() {
// Don't need to defer close, since ud.Close will close the reader
file, err := os.Open(tinyCoreFilePath)
Expect(err).NotTo(HaveOccurred())
aud = NewAsyncUploadDataSource(file)
result, err := aud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
})

DescribeTable("calling transfer should", func(fileName, scratchPath string, want []byte, wantErr bool) {
Expand Down Expand Up @@ -260,7 +260,7 @@ var _ = Describe("Async Upload data source", func() {
aud = NewAsyncUploadDataSource(sourceFile)
result, err := aud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = aud.TransferFile(filepath.Join(tmpDir, "file"))
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseValidatePause).To(Equal(result))
Expand All @@ -274,7 +274,7 @@ var _ = Describe("Async Upload data source", func() {
aud = NewAsyncUploadDataSource(sourceFile)
result, err := aud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = aud.TransferFile("/invalidpath/invalidfile")
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
Expand Down
3 changes: 2 additions & 1 deletion pkg/uploadserver/uploadserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ func newUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, file
}

// Clone block device to block device or file system
uds := importer.NewUploadDataSource(newContentReader(stream, sourceContentType), dvContentType)
directWriteException := sourceContentType == common.BlockdeviceClone
uds := importer.NewUploadDataSource(newContentReader(stream, sourceContentType), dvContentType, directWriteException)
processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation)
err := processor.ProcessData()
return processor.PreallocationApplied(), err
Expand Down
4 changes: 0 additions & 4 deletions tests/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,6 @@ var _ = Describe("[rfe_id:138][crit:high][vendor:[email protected]][level:compon
same, err := f.VerifyTargetPVCContentMD5(f.Namespace, archivePVC, pathInPvc, expectedMd5)
Expect(err).ToNot(HaveOccurred())
Expect(same).To(BeTrue())
By("Verifying the image is sparse")
Expect(f.VerifySparse(f.Namespace, archivePVC, pathInPvc, utils.UploadFileSize)).To(BeTrue())
}
} else {
checkFailureNoValidToken(archivePVC)
Expand Down Expand Up @@ -729,8 +727,6 @@ var _ = Describe("[rfe_id:138][crit:high][vendor:[email protected]][level:compon
same, err := f.VerifyTargetPVCContentMD5(f.Namespace, pvc, pathInPvc, expectedMd5)
Expect(err).ToNot(HaveOccurred())
Expect(same).To(BeTrue())
By("Verifying the image is sparse")
Expect(f.VerifySparse(f.Namespace, pvc, pathInPvc, utils.UploadFileSize)).To(BeTrue())
}
} else {
checkFailureNoValidToken(pvcPrime)
Expand Down

0 comments on commit ad75665

Please sign in to comment.