Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for syncing block volumes #121

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ require (
github.com/Luzifer/go-dhparam v1.1.0
github.com/evanphx/json-patch v4.11.0+incompatible
github.com/evanphx/json-patch/v5 v5.5.0
github.com/go-logr/logr v0.4.0
github.com/openshift/api v0.0.0-20210625082935-ad54d363d274
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.8.1
k8s.io/api v0.21.2
k8s.io/apimachinery v0.21.3
k8s.io/client-go v0.21.2
k8s.io/klog/v2 v2.8.0
k8s.io/utils v0.0.0-20210527160623-6fdb442a123b
sigs.k8s.io/controller-runtime v0.9.2
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.5 // indirect
Expand All @@ -38,7 +39,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/klog/v2 v2.8.0 // indirect
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion state_transfer/endpoint/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/konveyor/crane-lib/state_transfer/endpoint"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -137,7 +138,7 @@ func (s *ServiceEndpoint) createService(c client.Client) error {
}

err := c.Create(context.TODO(), &service, &client.CreateOptions{})
if err != nil {
if err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}

Expand Down
136 changes: 88 additions & 48 deletions state_transfer/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,31 @@ package state_transfer_test

import (
"context"
"fmt"
"log"
"testing"
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2/klogr"

"github.com/konveyor/crane-lib/state_transfer"
"github.com/konveyor/crane-lib/state_transfer/endpoint"
"github.com/konveyor/crane-lib/state_transfer/endpoint/route"
"github.com/konveyor/crane-lib/state_transfer/meta"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

statetransfermeta "github.com/konveyor/crane-lib/state_transfer/meta"
"github.com/konveyor/crane-lib/state_transfer/transfer"
"github.com/konveyor/crane-lib/state_transfer/transfer/rclone"
"github.com/konveyor/crane-lib/state_transfer/transfer/rsync"
"github.com/konveyor/crane-lib/state_transfer/transport"
"github.com/konveyor/crane-lib/state_transfer/transport/stunnel"
routev1 "github.com/openshift/api/route/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -33,28 +40,15 @@ var (

// This example shows how to wire up the components of the lib to
// transfer data from one PVC to another
func Example_basicTransfer() {
srcClient, err := client.New(srcCfg, client.Options{Scheme: runtime.NewScheme()})
if err != nil {
log.Fatal(err, "unable to create source client")
}

destClient, err := client.New(destCfg, client.Options{Scheme: runtime.NewScheme()})
if err != nil {
log.Fatal(err, "unable to create destination client")
}

// quiesce the applications if needed on the source side
err = state_transfer.QuiesceApplications(srcCfg, srcNamespace)
if err != nil {
log.Fatal(err, "unable to quiesce application on source cluster")
}
func TestExample_basicTransfer(t *testing.T) {
srcClient := buildTestClient(createPvc(srcPVC, srcNamespace))
destClient := buildTestClient()

// set up the PVC on destination to receive the data
pvc := &corev1.PersistentVolumeClaim{}
err = srcClient.Get(context.TODO(), client.ObjectKey{Namespace: srcNamespace, Name: srcPVC}, pvc)
err := srcClient.Get(context.TODO(), client.ObjectKey{Namespace: srcNamespace, Name: srcPVC}, pvc)
if err != nil {
log.Fatal(err, "unable to get source PVC")
t.Fatalf("unable to get source PVC: %v", err)
}

destPVC := pvc.DeepCopy()
Expand All @@ -64,35 +58,57 @@ func Example_basicTransfer() {
pvc.Annotations = map[string]string{}
err = destClient.Create(context.TODO(), destPVC, &client.CreateOptions{})
if err != nil {
log.Fatal(err, "unable to create destination PVC")
t.Fatalf("unable to create destination PVC: %v", err)
}

pvcList, err := transfer.NewPVCPairList(
pvcList, err := transfer.NewFilesystemPVCPairList(
transfer.NewPVCPair(pvc, destPVC),
)
if err != nil {
log.Fatal(err, "invalid pvc list")
t.Fatalf("invalid pvc list: %v", err)
}

// create a route for data transfer
r := route.NewEndpoint(
types.NamespacedName{
Namespace: pvc.Name,
Name: pvc.Namespace,
}, route.EndpointTypePassthrough, statetransfermeta.Labels, "")
Namespace: pvc.Namespace,
Name: pvc.Name,
}, route.EndpointTypePassthrough, statetransfermeta.Labels, "test.domain")
e, err := endpoint.Create(r, destClient)
if err != nil {
log.Fatal(err, "unable to create route endpoint")
t.Fatalf("unable to create route endpoint: %v", err)
}

_ = wait.PollUntil(time.Second*5, func() (done bool, err error) {
ready, err := e.IsHealthy(destClient)
if err != nil {
log.Println(err, "unable to check route health, retrying...")
return false, nil
}
return ready, nil
}, make(<-chan struct{}))
route := &routev1.Route{}
// Mark the route as admitted.
err = destClient.Get(context.TODO(), client.ObjectKey{Namespace: destPVC.Namespace, Name: destPVC.Name}, route)
if err != nil {
t.Fatalf("unable to get route: %v, %s/%s", err, destPVC.Namespace, destPVC.Name)
}
route.Status = routev1.RouteStatus{
Ingress: []routev1.RouteIngress{
{
Conditions: []routev1.RouteIngressCondition{
{
Type: routev1.RouteAdmitted,
Status: corev1.ConditionTrue,
},
},
},
},
}
err = destClient.Status().Update(context.TODO(), route)
if err != nil {
t.Fatalf("unable to update route status: %v", err)
}

ready, err := e.IsHealthy(destClient)
if err != nil {
t.Fatalf("unable to check route health: %v", err)
}
if !ready {
t.Fatalf("route is not ready")
}

// create an stunnel transport to carry the data over the route
s := stunnel.NewTransport(statetransfermeta.NewNamespacedPair(
Expand All @@ -101,25 +117,25 @@ func Example_basicTransfer() {
types.NamespacedName{
Name: destPVC.Name, Namespace: destPVC.Namespace},
), &transport.Options{})
_, err = transport.CreateServer(s, destClient, e)
_, err = transport.CreateServer(s, destClient, "fs", e)
if err != nil {
log.Fatal(err, "error creating stunnel server")
t.Fatalf("error creating stunnel server: %v", err)
}

_, err = transport.CreateClient(s, srcClient, e)
s, err = transport.CreateClient(s, srcClient, "fs", e)
if err != nil {
log.Fatal(err, "error creating stunnel client")
t.Fatalf("error creating stunnel client: %v", err)
}

// Create Rclone Transfer Pod
t, err := rclone.NewTransfer(s, r, srcCfg, destCfg, pvcList)
tr, err := rclone.NewTransfer(s, r, srcClient, destClient, pvcList)
if err != nil {
log.Fatal(err, "errror creating rclone transfer")
t.Fatalf("errror creating rclone transfer: %v", err)
}

err = transfer.CreateServer(t)
err = transfer.CreateServer(tr)
if err != nil {
log.Fatal(err, "error creating rclone server")
t.Fatalf("error creating rclone server: %v", err)
}

// Rsync Example
Expand All @@ -130,15 +146,15 @@ func Example_basicTransfer() {
}
rsyncTransferOptions = append(rsyncTransferOptions, customTransferOptions...)

rsyncTransfer, err := rsync.NewTransfer(s, r, srcCfg, destCfg, pvcList, rsyncTransferOptions...)
rsyncTransfer, err := rsync.NewTransfer(s, r, srcClient, destClient, pvcList, klogr.New(), rsyncTransferOptions...)
if err != nil {
log.Fatal(err, "error creating rsync transfer")
} else {
log.Printf("rsync transfer created for pvc %s\n", rsyncTransfer.PVCs()[0].Source().Claim().Name)
}

// Create Rclone Client Pod
err = transfer.CreateClient(t)
err = transfer.CreateClient(tr)
if err != nil {
log.Fatal(err, "error creating rclone client")
}
Expand Down Expand Up @@ -168,7 +184,7 @@ func Example_getFromCreatedObjects() {

destPVC := pvc.DeepCopy()

pvcList, err := transfer.NewPVCPairList(
pvcList, err := transfer.NewFilesystemPVCPairList(
transfer.NewPVCPair(pvc, destPVC),
)
if err != nil {
Expand All @@ -184,20 +200,20 @@ func Example_getFromCreatedObjects() {
types.NamespacedName{Namespace: srcNamespace, Name: srcPVC},
types.NamespacedName{Namespace: srcNamespace, Name: srcPVC},
)
s, err := stunnel.GetTransportFromKubeObjects(srcClient, destClient, nnPair, e, &transport.Options{})
s, err := stunnel.GetTransportFromKubeObjects(srcClient, destClient, "fs", nnPair, e, &transport.Options{})
if err != nil {
log.Fatal(err, "error getting stunnel transport")
}

pvcList, err = transfer.NewPVCPairList(
pvcList, err = transfer.NewFilesystemPVCPairList(
transfer.NewPVCPair(pvc, nil),
)
if err != nil {
log.Fatal(err, "invalid pvc list")
}

// Create Rclone Transfer Pod
t, err := rclone.NewTransfer(s, e, srcCfg, destCfg, pvcList)
t, err := rclone.NewTransfer(s, e, srcClient, destClient, pvcList)
if err != nil {
log.Fatal(err, "errror creating rclone transfer")
}
Expand All @@ -224,3 +240,27 @@ func Example_getFromCreatedObjects() {

// TODO: check if the client is completed
}

func buildTestClient(objects ...runtime.Object) client.Client {
s := scheme.Scheme
schemeInitFuncs := []func(*runtime.Scheme) error{
corev1.AddToScheme,
routev1.AddToScheme,
}
for _, f := range schemeInitFuncs {
if err := f(s); err != nil {
panic(fmt.Errorf("failed to initiate the scheme %w", err))
}
}

return fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build()
}

func createPvc(name, namespace string) *corev1.PersistentVolumeClaim {
return &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
}
74 changes: 74 additions & 0 deletions state_transfer/transfer/blockrsync/blockrsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package blockrsync

import (
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/konveyor/crane-lib/state_transfer/endpoint"
"github.com/konveyor/crane-lib/state_transfer/transfer"
"github.com/konveyor/crane-lib/state_transfer/transport"
)

const (
blockrsyncImage = "quay.io/awels/blockrsync:latest"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you intend to keep the image in your account/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I would love for this to be part of the konveyor organization. I just started the development in my local repo so I could make progress.

volumeName = "volume"
BlockRsyncContainer = "blockrsync"
Proxy = "proxy"
)

type BlockrsyncTransfer struct {
log logr.Logger
username string
password string
source client.Client
destination client.Client
pvcList transfer.PVCPairList
transport transport.Transport
endpoint endpoint.Endpoint
transferOptions *TransferOptions
}

func NewTransfer(t transport.Transport, e endpoint.Endpoint, src client.Client,
dest client.Client, pvcList transfer.PVCPairList, log logr.Logger, options *TransferOptions) (transfer.Transfer, error) {
err := validatePVCList(pvcList)
if err != nil {
return nil, err
}
return &BlockrsyncTransfer{
log: log,
transport: t,
endpoint: e,
source: src,
destination: dest,
pvcList: pvcList,
transferOptions: options,
}, nil
}

func (r *BlockrsyncTransfer) PVCs() transfer.PVCPairList {
return r.pvcList
}

func (r *BlockrsyncTransfer) Endpoint() endpoint.Endpoint {
return r.endpoint
}

func (r *BlockrsyncTransfer) Transport() transport.Transport {
return r.transport
}

func (r *BlockrsyncTransfer) Source() client.Client {
return r.source
}

func (r *BlockrsyncTransfer) Destination() client.Client {
return r.destination
}

func (r *BlockrsyncTransfer) Username() string {
return r.username
}

func (r *BlockrsyncTransfer) Password() string {
return r.password
}
Loading
Loading