Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Squished commit
Browse files Browse the repository at this point in the history
Shastick committed Sep 6, 2024
1 parent 32f570b commit 00a7e0c
Showing 1 changed file with 94 additions and 34 deletions.
128 changes: 94 additions & 34 deletions pkg/scd/operational_intents_handler.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,37 @@ import (
"github.com/interuss/stacktrace"
)

// subscriptionIsImplicitAndOnlyAttachedToOIR will check if:
// - the subscription is defined and is implicit
// - the subscription is attached to the specified operational intent
// - the subscription is not attached to any other operational intent
//
// This is to be used in contexts where an implicit subscription may need to be cleaned up: if true is returned,
// the subscription can be safely removed after the operational intent is deleted or attached to another subscription.
//
// NOTE: this should eventually be pushed down to CRDB as part of the queries being executed in the callers of this method.
//
// See https://github.com/interuss/dss/issues/1059 for more details
func subscriptionIsImplicitAndOnlyAttachedToOIR(ctx context.Context, r repos.Repository, oirID *dssmodels.ID, subscription *scdmodels.Subscription) (bool, error) {
if subscription == nil {
return false, nil
}
if !subscription.ImplicitSubscription {
return false, nil
}
// Get the Subscription's dependent OperationalIntents
dependentOps, err := r.GetDependentOperationalIntents(ctx, subscription.ID)
if err != nil {
return false, stacktrace.Propagate(err, "Could not find dependent OperationalIntents")
}
if len(dependentOps) == 0 {
return false, stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents")
} else if len(dependentOps) == 1 && dependentOps[0] == *oirID {
return true, nil
}
return false, nil
}

// DeleteOperationalIntentReference deletes a single operational intent ref for a given ID at
// the specified version.
func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *restapi.DeleteOperationalIntentReferenceRequest,
@@ -69,29 +100,20 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest
}

// Get the Subscription supporting the OperationalIntent, if one is defined
var sub *scdmodels.Subscription
removeImplicitSubscription := false
var previousSubscription *scdmodels.Subscription
if old.SubscriptionID != nil {
sub, err = r.GetSubscription(ctx, *old.SubscriptionID)
previousSubscription, err = r.GetSubscription(ctx, *old.SubscriptionID)
if err != nil {
return stacktrace.Propagate(err, "Unable to get OperationalIntent's Subscription from repo")
}
if sub == nil {
if previousSubscription == nil {
return stacktrace.NewError("OperationalIntent's Subscription missing from repo")
}
}

if sub.ImplicitSubscription {
// Get the Subscription's dependent OperationalIntents
dependentOps, err := r.GetDependentOperationalIntents(ctx, sub.ID)
if err != nil {
return stacktrace.Propagate(err, "Could not find dependent OperationalIntents")
}
if len(dependentOps) == 0 {
return stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents")
} else if len(dependentOps) == 1 {
removeImplicitSubscription = true
}
}
removeImplicitSubscription, err := subscriptionIsImplicitAndOnlyAttachedToOIR(ctx, r, &id, previousSubscription)
if err != nil {
return stacktrace.Propagate(err, "Could not determine if Subscription can be removed")
}

// Gather the subscriptions that need to be notified
@@ -119,7 +141,7 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest
// removeImplicitSubscription is only true if the OIR had a subscription defined
if removeImplicitSubscription {
// Automatically remove a now-unused implicit Subscription
err = r.DeleteSubscription(ctx, sub.ID)
err = r.DeleteSubscription(ctx, previousSubscription.ID)
if err != nil {
return stacktrace.Propagate(err, "Unable to delete associated implicit Subscription")
}
@@ -810,33 +832,63 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize

// For an OIR being created, version starts at 0
version := int32(0)
var previousSub *scdmodels.Subscription
if old != nil {
version = int32(old.Version)
// Fetch the previous OIR's subscription if it exists
if old.SubscriptionID != nil {
previousSub, err = r.GetSubscription(ctx, *old.SubscriptionID)
if err != nil {
return stacktrace.Propagate(err, "Unable to get OperationalIntent's Subscription from repo")
}
}
}

var sub *scdmodels.Subscription
// Determine if the previous subscription is being replaced and if it will need to be cleaned up
previousSubIsBeingReplaced := previousSub != nil && validParams.subscriptionID != previousSub.ID
removePreviousImplicitSubscription := false
if previousSubIsBeingReplaced {
removePreviousImplicitSubscription, err = subscriptionIsImplicitAndOnlyAttachedToOIR(ctx, r, &validParams.id, previousSub)
if err != nil {
return stacktrace.Propagate(err, "Could not determine if previous Subscription can be removed")
}
}

// attachedSub is the subscription that will end up being attached to the OIR
// it defaults to the previous subscription (which may be nil), and may be updated if required by the parameters
attachedSub := previousSub
if validParams.subscriptionID.Empty() {
// Create an implicit subscription if one has been requested.
// Requesting neither an explicit nor an implicit subscription is allowed for ACCEPTED states:
// for other states, an error will have been returned earlier.
// if no implicit subscription is requested and we reached this point, we will proceed without subscription
// No subscription ID was provided:
// check if an implicit subscription should be created, otherwise do nothing
if validParams.implicitSubscription.requested {
if sub, err = createAndStoreNewImplicitSubscription(ctx, r, manager, validParams); err != nil {
// Parameters for a new implicit subscription have been passed: we will create
// a new implicit subscription even if another subscription was attaches to this OIR before,
// regardless of whether it was an implicit subscription or not.
if attachedSub, err = createAndStoreNewImplicitSubscription(ctx, r, manager, validParams); err != nil {
return stacktrace.Propagate(err, "Failed to create implicit subscription")
}
} else {
// If no subscription ID is provided and no implicit subscription is requested,
// the OIR should have no attached subscription
attachedSub = nil
}
} else {
// Attempt to rely on the specified subscription
sub, err = r.GetSubscription(ctx, validParams.subscriptionID)
if err != nil {
return stacktrace.Propagate(err, "Unable to get depended-upon Subscription")
}
if sub == nil {
return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Depended-upon Subscription %s does not exist", validParams.subscriptionID)
// If it is different from the previous subscription, we need to fetch it from the store
// in order to ensure it correctly covers the OIR.
// We do the check below in order to avoid re-fetching the subscription if it has not changed
if attachedSub == nil || previousSubIsBeingReplaced {
attachedSub, err = r.GetSubscription(ctx, validParams.subscriptionID)
if err != nil {
return stacktrace.Propagate(err, "Unable to get requested Subscription from store")
}
if attachedSub == nil {
return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Specified Subscription %s does not exist", validParams.subscriptionID)
}
}

// We need to confirm that it is owned by the calling manager
if sub.Manager != manager {
if attachedSub.Manager != manager {
return stacktrace.Propagate(
// We do a bit of wrapping gymnastics because the root error message will be sent in the response,
// and we don't want to include the effective manager in there.
@@ -845,34 +897,42 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize
// The propagation message will end in the logs and help with debugging.
"Subscription %s owned by %s, but %s attempted to use it for an OperationalIntent",
validParams.subscriptionID,
sub.Manager,
attachedSub.Manager,
manager,
)
}

// We need to ensure the subscription covers the OIR's geo-temporal extent
sub, err = ensureSubscriptionCoversOIR(ctx, r, sub, validParams)
attachedSub, err = ensureSubscriptionCoversOIR(ctx, r, attachedSub, validParams)
if err != nil {
return stacktrace.Propagate(err, "Failed to ensure subscription covers OIR")
}
}

if validParams.state.RequiresKey() {
responseConflict, err = validateKeyAndProvideConflictResponse(ctx, r, manager, validParams, sub)
responseConflict, err = validateKeyAndProvideConflictResponse(ctx, r, manager, validParams, attachedSub)
if err != nil {
return stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), "Failed to validate key")
}
}

// Construct the new OperationalIntent
op := validParams.toOIR(manager, sub, version+1)
op := validParams.toOIR(manager, attachedSub, version+1)

// Upsert the OperationalIntent
op, err = r.UpsertOperationalIntent(ctx, op)
if err != nil {
return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo")
}

// Check if the previously attached subscription should be removed
if removePreviousImplicitSubscription {
err = r.DeleteSubscription(ctx, previousSub.ID)
if err != nil {
return stacktrace.Propagate(err, "Unable to delete previous implicit Subscription")
}
}

notifyVolume, err := computeNotificationVolume(old, validParams.uExtent)
if err != nil {
return stacktrace.Propagate(err, "Failed to compute notification volume")

0 comments on commit 00a7e0c

Please sign in to comment.