Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

billing errors and warnings, stripe and orb webhook, river worker for async background jobs #5440

Merged
merged 37 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0068eb3
stripe webhook, river worker for async background jobs
pjain1 Aug 13, 2024
9196408
remove comments
pjain1 Aug 13, 2024
bb3f53a
handle more events, out of order webhook events, add apis
pjain1 Aug 15, 2024
5a1e6c7
lint
pjain1 Aug 15, 2024
e9255c8
trial end check
pjain1 Aug 20, 2024
1b9d204
workers for trial end checks, plan changes etc.
pjain1 Aug 22, 2024
997e8fd
org webhook
pjain1 Aug 23, 2024
65f2670
add orb webhook, refactor billing errors/warnings
pjain1 Aug 23, 2024
fc4eedf
allow subscription cancellation
pjain1 Aug 27, 2024
e51ee88
sub cancel support fix
pjain1 Sep 2, 2024
8dc8cc8
comments
pjain1 Sep 2, 2024
2158835
self review fixes
pjain1 Sep 2, 2024
e770246
self review
pjain1 Sep 2, 2024
c5b5365
self review
pjain1 Sep 2, 2024
e104a24
handle no billing address
pjain1 Sep 3, 2024
c14c558
cli print issue
pjain1 Sep 3, 2024
a8d8eeb
simplify handling payment method attach detach events
pjain1 Sep 3, 2024
fcde9e8
log
pjain1 Sep 3, 2024
b883769
clean up
pjain1 Sep 3, 2024
66f9517
merge with main
pjain1 Sep 3, 2024
738d9ed
refactor river jobs
pjain1 Sep 4, 2024
7c3e727
simplify, add time checks and retries
pjain1 Sep 4, 2024
ad77c44
demo review
pjain1 Sep 4, 2024
67ab952
cli for sub cancellation
pjain1 Sep 5, 2024
e093739
var name
pjain1 Sep 5, 2024
d9c7991
track trial jobs and cancel them
pjain1 Sep 5, 2024
1a714c5
fix proto
pjain1 Sep 5, 2024
443a992
purge org job
pjain1 Sep 5, 2024
a1fe41d
review comments
pjain1 Sep 11, 2024
be455da
cli org flag
pjain1 Sep 12, 2024
006879b
set jobs client in webhook handler
pjain1 Sep 12, 2024
4f2cbf6
merge with main
pjain1 Sep 12, 2024
b9294a5
merge billing errors and warnings
pjain1 Sep 12, 2024
3fe2c02
review comments
pjain1 Sep 16, 2024
a7e5a05
review comments
pjain1 Sep 16, 2024
2fe9b4c
convert scheduled jobs to cron jobs
pjain1 Sep 18, 2024
e867418
merge with main
pjain1 Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions admin/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/rilldata/rill/admin/billing"
"github.com/rilldata/rill/admin/database"
"go.uber.org/zap"
)

func (s *Service) InitOrganizationBilling(ctx context.Context, org *database.Organization) (*database.Organization, *billing.Subscription, error) {
// TODO This can be moved to a background job and repair org billing job can be removed in the next version. We need repair job to fix existing orgs but afterwards background job wil ensure that all orgs are in sync with billing system
// create payment customer
pc, err := s.PaymentProvider.CreateCustomer(ctx, org)
if err != nil {
Expand Down Expand Up @@ -39,6 +41,34 @@ func (s *Service) InitOrganizationBilling(ctx context.Context, org *database.Org
}
s.Logger.Info("created subscription", zap.String("org", org.Name), zap.String("subscription_id", sub.ID))

// scheduling it before the org update as repair billing job can take care of it if update fails
err = s.ScheduleTrialEndCheckJobs(ctx, org.ID, sub.ID, plan.ID, sub.StartDate, sub.TrialEndDate)
if err != nil {
return nil, nil, err
}

// raise no payment method billing error
_, err = s.DB.UpsertBillingError(ctx, &database.UpsertBillingErrorOptions{
OrgID: org.ID,
Type: database.BillingErrorTypeNoPaymentMethod,
Metadata: &database.BillingErrorMetadataNoPaymentMethod{},
EventTime: org.CreatedOn,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to upsert billing error: %w", err)
}

// raise no billable address billing error
_, err = s.DB.UpsertBillingError(ctx, &database.UpsertBillingErrorOptions{
OrgID: org.ID,
Type: database.BillingErrorTypeNoBillableAddress,
Metadata: &database.BillingErrorMetadataNoBillableAddress{},
EventTime: org.CreatedOn,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to upsert billing error: %w", err)
}

org, err = s.DB.UpdateOrganization(ctx, org.ID, &database.UpdateOrganizationOptions{
Name: org.Name,
DisplayName: org.DisplayName,
Expand Down Expand Up @@ -81,6 +111,34 @@ func (s *Service) RepairOrgBilling(ctx context.Context, org *database.Organizati
}
s.Logger.Info("created subscription", zap.String("org_id", org.ID), zap.String("org_name", org.Name), zap.String("subscription_id", sub.ID))
subs = append(subs, sub)

// schedule trial end check job to the river queue, if it was already scheduled it will be ignored
err = s.ScheduleTrialEndCheckJobs(ctx, org.ID, sub.ID, plan.ID, sub.StartDate, sub.TrialEndDate)
if err != nil {
return nil, nil, err
}

// raise no payment method billing error
_, err = s.DB.UpsertBillingError(ctx, &database.UpsertBillingErrorOptions{
OrgID: org.ID,
Type: database.BillingErrorTypeNoPaymentMethod,
Metadata: &database.BillingErrorMetadataNoPaymentMethod{},
EventTime: org.CreatedOn,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to upsert billing error: %w", err)
}

// raise no billable address billing error
_, err = s.DB.UpsertBillingError(ctx, &database.UpsertBillingErrorOptions{
OrgID: org.ID,
Type: database.BillingErrorTypeNoBillableAddress,
Metadata: &database.BillingErrorMetadataNoBillableAddress{},
EventTime: org.CreatedOn,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to upsert billing error: %w", err)
}
}
if len(subs) > 1 {
s.Logger.Warn("multiple subscriptions found for the customer", zap.String("org_id", org.ID), zap.String("org_name", org.Name), zap.Int("num_subscriptions", len(subs)))
Expand Down Expand Up @@ -152,6 +210,34 @@ func (s *Service) RepairOrgBilling(ctx context.Context, org *database.Organizati
}
s.Logger.Info("created subscription", zap.String("org_id", org.ID), zap.String("org_name", org.Name), zap.String("subscription_id", sub.ID))
subs = append(subs, sub)

// schedule trial end check job to the river queue
err = s.ScheduleTrialEndCheckJobs(ctx, org.ID, sub.ID, plan.ID, sub.StartDate, sub.TrialEndDate)
if err != nil {
return nil, nil, err
}

// raise no payment method billing error
_, err = s.DB.UpsertBillingError(ctx, &database.UpsertBillingErrorOptions{
OrgID: org.ID,
Type: database.BillingErrorTypeNoPaymentMethod,
Metadata: &database.BillingErrorMetadataNoPaymentMethod{},
EventTime: org.CreatedOn,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to upsert billing error: %w", err)
}

// raise no billable address billing error
_, err = s.DB.UpsertBillingError(ctx, &database.UpsertBillingErrorOptions{
OrgID: org.ID,
Type: database.BillingErrorTypeNoBillableAddress,
Metadata: &database.BillingErrorMetadataNoBillableAddress{},
EventTime: org.CreatedOn,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to upsert billing error: %w", err)
}
} else if len(subs) > 1 {
s.Logger.Warn("multiple subscriptions found for the customer", zap.String("org_id", org.ID), zap.String("org_name", org.Name), zap.Int("num_subscriptions", len(subs)))
}
Expand Down Expand Up @@ -179,6 +265,151 @@ func (s *Service) RepairOrgBilling(ctx context.Context, org *database.Organizati
return org, subs, nil
}

func (s *Service) ScheduleTrialEndCheckJobs(ctx context.Context, orgID, subID, planID string, trialStartDate, trialEndDate time.Time) error {
if trialEndDate.Before(time.Now()) {
return nil
}

// schedule trial ending soon job
tes, err := s.Jobs.TrialEndingSoon(ctx, orgID, subID, planID, trialEndDate)
if err != nil {
return fmt.Errorf("failed to schedule trial ending soon job: %w", err)
}

// schedule trial end check job
tec, err := s.Jobs.TrialEndCheck(ctx, orgID, subID, planID, trialEndDate)
if err != nil {
return fmt.Errorf("failed to schedule trial end check job: %w", err)
}

// raise on-trial billing warning
_, err = s.DB.UpsertBillingWarning(ctx, &database.UpsertBillingWarningOptions{
OrgID: orgID,
Type: database.BillingWarningTypeOnTrial,
Metadata: &database.BillingWarningMetadataOnTrial{
EndDate: trialEndDate,
TrialEndingSoonJobID: tes.ID,
TrialEndCheckJobID: tec.ID,
},
EventTime: trialStartDate,
})
if err != nil {
return fmt.Errorf("failed to upsert billing warning: %w", err)
}

return nil
}

// CleanupTrialBillingErrorsAndWarnings removes trial related billing error and warning and cancel associated jobs
func (s *Service) CleanupTrialBillingErrorsAndWarnings(ctx context.Context, orgID string) error {
bett, err := s.DB.FindBillingErrorByType(ctx, orgID, database.BillingErrorTypeTrialEnded)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return fmt.Errorf("failed to find billing errors: %w", err)
}
}

if bett != nil {
metadata, ok := bett.Metadata.(*database.BillingErrorMetadataTrialEnded)
if ok && metadata.GracePeriodEndJobID > 0 {
// cancel the trial end grace check job, ignore errors.
_ = s.Jobs.CancelJob(ctx, metadata.GracePeriodEndJobID)
}
err = s.DB.DeleteBillingError(ctx, bett.ID)
if err != nil {
return fmt.Errorf("failed to delete billing error: %w", err)
}
}

bwot, err := s.DB.FindBillingWarningByType(ctx, orgID, database.BillingWarningTypeOnTrial)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return fmt.Errorf("failed to find billing warnings: %w", err)
}
}

if bwot != nil {
metadata, ok := bwot.Metadata.(*database.BillingWarningMetadataOnTrial)
if ok && metadata.TrialEndingSoonJobID > 0 {
// cancel the trial ending soon job, ignore errors.
_ = s.Jobs.CancelJob(ctx, metadata.TrialEndingSoonJobID)
}

if ok && metadata.TrialEndCheckJobID > 0 {
// cancel the trial end check job, ignore errors.
_ = s.Jobs.CancelJob(ctx, metadata.TrialEndCheckJobID)
}

err = s.DB.DeleteBillingWarning(ctx, bwot.ID)
if err != nil {
return fmt.Errorf("failed to delete billing warning: %w", err)
}
}

return nil
}

// CleanupBillingErrorSubCancellation removes subscription cancellation related billing error and cancel associated job
func (s *Service) CleanupBillingErrorSubCancellation(ctx context.Context, orgID string) error {
besc, err := s.DB.FindBillingErrorByType(ctx, orgID, database.BillingErrorTypeSubscriptionCancelled)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return fmt.Errorf("failed to find billing errors: %w", err)
}
}

if besc != nil {
metadata, ok := besc.Metadata.(*database.BillingErrorMetadataSubscriptionCancelled)
if ok && metadata.SubEndJobID > 0 {
// cancel the subscription end check job, ignore errors.
_ = s.Jobs.CancelJob(ctx, metadata.SubEndJobID)
}
err = s.DB.DeleteBillingError(ctx, besc.ID)
if err != nil {
return fmt.Errorf("failed to delete billing error: %w", err)
}
}

return nil
}

func (s *Service) CheckBillingErrors(ctx context.Context, orgID string) error {
be, err := s.DB.FindBillingErrorByType(ctx, orgID, database.BillingErrorTypeTrialEnded)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return err
}
}

if be != nil {
return fmt.Errorf("trial has ended")
}

be, err = s.DB.FindBillingErrorByType(ctx, orgID, database.BillingErrorTypeInvoicePaymentFailed)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return err
}
}

if be != nil { // should we allow any grace period here?
return fmt.Errorf("invoice payment failed")
}

be, err = s.DB.FindBillingErrorByType(ctx, orgID, database.BillingErrorTypeSubscriptionCancelled)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return err
}
}

if be != nil && be.Metadata.(*database.BillingErrorMetadataSubscriptionCancelled).EndDate.AddDate(0, 0, 1).After(time.Now()) {
return fmt.Errorf("subscription cancelled")
}

return nil
}

func valOrDefault[T any](ptr *T, def T) T {
if ptr != nil {
return *ptr
Expand Down
49 changes: 41 additions & 8 deletions admin/billing/biller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package billing
import (
"context"
"errors"
"net/http"
"time"

"github.com/rilldata/rill/admin/database"
"github.com/rilldata/rill/admin/jobs"
)

const (
Expand All @@ -32,21 +34,32 @@ type Biller interface {
UpdateCustomerPaymentID(ctx context.Context, customerID string, provider PaymentProvider, paymentProviderID string) error
UpdateCustomerEmail(ctx context.Context, customerID, email string) error

// CreateSubscription creates a subscription for the given organization.
// The subscription starts immediately.
// CreateSubscription creates a subscription for the given organization. Subscription starts immediately.
CreateSubscription(ctx context.Context, customerID string, plan *Plan) (*Subscription, error)
// CreateSubscriptionInFuture creates a subscription for the given organization with a start date in the future.
CreateSubscriptionInFuture(ctx context.Context, customerID string, plan *Plan, startDate time.Time) (*Subscription, error)
CancelSubscription(ctx context.Context, subscriptionID string, cancelOption SubscriptionCancellationOption) error
GetSubscriptionsForCustomer(ctx context.Context, customerID string) ([]*Subscription, error)
ChangeSubscriptionPlan(ctx context.Context, subscriptionID string, plan *Plan) (*Subscription, error)
ChangeSubscriptionPlan(ctx context.Context, subscriptionID string, plan *Plan, changeOption SubscriptionChangeOption) (*Subscription, error)
// CancelSubscriptionsForCustomer deletes the subscription for the given organization.
// cancellationDate only applicable if option is SubscriptionCancellationOptionRequestedDate
CancelSubscriptionsForCustomer(ctx context.Context, customerID string, cancelOption SubscriptionCancellationOption) error
FindSubscriptionsPastTrialPeriod(ctx context.Context) ([]*Subscription, error)

GetInvoice(ctx context.Context, invoiceID string) (*Invoice, error)
IsInvoiceValid(ctx context.Context, invoice *Invoice) bool
IsInvoicePaid(ctx context.Context, invoice *Invoice) bool

ReportUsage(ctx context.Context, usage []*Usage) error

GetReportingGranularity() UsageReportingGranularity
GetReportingWorkerCron() string

// WebhookHandlerFunc returns a http.HandlerFunc that can be used to handle incoming webhooks from the payment provider. Return nil if you don't want to register any webhook handlers.
WebhookHandlerFunc(ctx context.Context) func(w http.ResponseWriter, r *http.Request)
pjain1 marked this conversation as resolved.
Show resolved Hide resolved

// SetJobsClient needs to be explicitly set because of circular dependency, see admin start method
SetJobsClient(jobs jobs.Client)
pjain1 marked this conversation as resolved.
Show resolved Hide resolved
}

type Plan struct {
Expand Down Expand Up @@ -96,11 +109,12 @@ type Subscription struct {
}

type Customer struct {
ID string
Email string
Name string
PaymentProviderID string
PortalURL string
ID string
Email string
Name string
PaymentProviderID string
PortalURL string
HasBillableAddress bool
}

type Usage struct {
Expand All @@ -113,6 +127,18 @@ type Usage struct {
Metadata map[string]interface{}
}

type Invoice struct {
ID string
Status string
CustomerID string
Amount string
Currency string
DueDate time.Time
CreatedAt time.Time
SubscriptionID string
Metadata map[string]interface{}
}

type UsageReportingGranularity string

const (
Expand All @@ -127,6 +153,13 @@ const (
SubscriptionCancellationOptionImmediate
)

type SubscriptionChangeOption int

const (
SubscriptionChangeOptionEndOfSubscriptionTerm SubscriptionChangeOption = iota
SubscriptionChangeOptionImmediate
)

type PaymentProvider string

const (
Expand Down
Loading
Loading