Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

billing errors and warnings, stripe and orb webhook, river worker for async background jobs #5440

Merged
merged 37 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0068eb3
stripe webhook, river worker for async background jobs
pjain1 Aug 13, 2024
9196408
remove comments
pjain1 Aug 13, 2024
bb3f53a
handle more events, out of order webhook events, add apis
pjain1 Aug 15, 2024
5a1e6c7
lint
pjain1 Aug 15, 2024
e9255c8
trial end check
pjain1 Aug 20, 2024
1b9d204
workers for trial end checks, plan changes etc.
pjain1 Aug 22, 2024
997e8fd
org webhook
pjain1 Aug 23, 2024
65f2670
add orb webhook, refactor billing errors/warnings
pjain1 Aug 23, 2024
fc4eedf
allow subscription cancellation
pjain1 Aug 27, 2024
e51ee88
sub cancel support fix
pjain1 Sep 2, 2024
8dc8cc8
comments
pjain1 Sep 2, 2024
2158835
self review fixes
pjain1 Sep 2, 2024
e770246
self review
pjain1 Sep 2, 2024
c5b5365
self review
pjain1 Sep 2, 2024
e104a24
handle no billing address
pjain1 Sep 3, 2024
c14c558
cli print issue
pjain1 Sep 3, 2024
a8d8eeb
simplify handling payment method attach detach events
pjain1 Sep 3, 2024
fcde9e8
log
pjain1 Sep 3, 2024
b883769
clean up
pjain1 Sep 3, 2024
66f9517
merge with main
pjain1 Sep 3, 2024
738d9ed
refactor river jobs
pjain1 Sep 4, 2024
7c3e727
simplify, add time checks and retries
pjain1 Sep 4, 2024
ad77c44
demo review
pjain1 Sep 4, 2024
67ab952
cli for sub cancellation
pjain1 Sep 5, 2024
e093739
var name
pjain1 Sep 5, 2024
d9c7991
track trial jobs and cancel them
pjain1 Sep 5, 2024
1a714c5
fix proto
pjain1 Sep 5, 2024
443a992
purge org job
pjain1 Sep 5, 2024
a1fe41d
review comments
pjain1 Sep 11, 2024
be455da
cli org flag
pjain1 Sep 12, 2024
006879b
set jobs client in webhook handler
pjain1 Sep 12, 2024
4f2cbf6
merge with main
pjain1 Sep 12, 2024
b9294a5
merge billing errors and warnings
pjain1 Sep 12, 2024
3fe2c02
review comments
pjain1 Sep 16, 2024
a7e5a05
review comments
pjain1 Sep 16, 2024
2fe9b4c
convert scheduled jobs to cron jobs
pjain1 Sep 18, 2024
e867418
merge with main
pjain1 Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions admin/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/rilldata/rill/admin/billing"
"github.com/rilldata/rill/admin/database"
"go.uber.org/zap"
)

func (s *Service) InitOrganizationBilling(ctx context.Context, org *database.Organization) (*database.Organization, *billing.Subscription, error) {
// TODO This can be moved to a background job and repair org billing job can be removed in the next version. We need repair job to fix existing orgs but afterwards background job wil ensure that all orgs are in sync with billing system
// create payment customer
pc, err := s.PaymentProvider.CreateCustomer(ctx, org)
if err != nil {
Expand Down Expand Up @@ -39,6 +41,11 @@ func (s *Service) InitOrganizationBilling(ctx context.Context, org *database.Org
}
s.Logger.Info("created subscription", zap.String("org", org.Name), zap.String("subscription_id", sub.ID))

err = s.RaiseNewOrgBillingIssues(ctx, org.ID, sub.ID, plan.ID, org.CreatedOn, sub.StartDate, sub.TrialEndDate)
if err != nil {
return nil, nil, err
}

org, err = s.DB.UpdateOrganization(ctx, org.ID, &database.UpdateOrganizationOptions{
Name: org.Name,
DisplayName: org.DisplayName,
Expand Down Expand Up @@ -81,6 +88,12 @@ func (s *Service) RepairOrgBilling(ctx context.Context, org *database.Organizati
}
s.Logger.Info("created subscription", zap.String("org_id", org.ID), zap.String("org_name", org.Name), zap.String("subscription_id", sub.ID))
subs = append(subs, sub)

// raise initial billing issues
err = s.RaiseNewOrgBillingIssues(ctx, org.ID, sub.ID, plan.ID, org.CreatedOn, sub.StartDate, sub.TrialEndDate)
if err != nil {
return nil, nil, err
}
}
if len(subs) > 1 {
s.Logger.Warn("multiple subscriptions found for the customer", zap.String("org_id", org.ID), zap.String("org_name", org.Name), zap.Int("num_subscriptions", len(subs)))
Expand Down Expand Up @@ -152,6 +165,33 @@ func (s *Service) RepairOrgBilling(ctx context.Context, org *database.Organizati
}
s.Logger.Info("created subscription", zap.String("org_id", org.ID), zap.String("org_name", org.Name), zap.String("subscription_id", sub.ID))
subs = append(subs, sub)

err = s.RaiseNewOrgBillingIssues(ctx, org.ID, sub.ID, plan.ID, org.CreatedOn, sub.StartDate, sub.TrialEndDate)
if err != nil {
return nil, nil, err
}

// raise no payment method billing issue
_, err = s.DB.UpsertBillingIssue(ctx, &database.UpsertBillingIssueOptions{
OrgID: org.ID,
Type: database.BillingIssueTypeNoPaymentMethod,
Metadata: &database.BillingIssueMetadataNoPaymentMethod{},
EventTime: org.CreatedOn,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to upsert billing error: %w", err)
}

// raise no billable address billing issue
_, err = s.DB.UpsertBillingIssue(ctx, &database.UpsertBillingIssueOptions{
OrgID: org.ID,
Type: database.BillingIssueTypeNoBillableAddress,
Metadata: &database.BillingIssueMetadataNoBillableAddress{},
EventTime: org.CreatedOn,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to upsert billing error: %w", err)
}
} else if len(subs) > 1 {
s.Logger.Warn("multiple subscriptions found for the customer", zap.String("org_id", org.ID), zap.String("org_name", org.Name), zap.Int("num_subscriptions", len(subs)))
}
Expand Down Expand Up @@ -179,6 +219,137 @@ func (s *Service) RepairOrgBilling(ctx context.Context, org *database.Organizati
return org, subs, nil
}

// RaiseNewOrgBillingIssues raises billing issues for a new organization
func (s *Service) RaiseNewOrgBillingIssues(ctx context.Context, orgID, subID, planID string, creationTime, trialStartDate, trialEndDate time.Time) error {
// raise no payment method billing issue
_, err := s.DB.UpsertBillingIssue(ctx, &database.UpsertBillingIssueOptions{
OrgID: orgID,
Type: database.BillingIssueTypeNoPaymentMethod,
Metadata: &database.BillingIssueMetadataNoPaymentMethod{},
EventTime: creationTime,
})
if err != nil {
return fmt.Errorf("failed to upsert billing error: %w", err)
}

// raise no billable address billing issue
_, err = s.DB.UpsertBillingIssue(ctx, &database.UpsertBillingIssueOptions{
OrgID: orgID,
Type: database.BillingIssueTypeNoBillableAddress,
Metadata: &database.BillingIssueMetadataNoBillableAddress{},
EventTime: creationTime,
})
if err != nil {
return fmt.Errorf("failed to upsert billing error: %w", err)
}

// raise on-trial billing warning
_, err = s.DB.UpsertBillingIssue(ctx, &database.UpsertBillingIssueOptions{
OrgID: orgID,
Type: database.BillingIssueTypeOnTrial,
Metadata: &database.BillingIssueMetadataOnTrial{
SubID: subID,
PlanID: planID,
EndDate: trialEndDate,
},
EventTime: trialStartDate,
})
if err != nil {
return fmt.Errorf("failed to upsert billing warning: %w", err)
}

return nil
}

// CleanupTrialBillingIssues removes trial related billing issues and cancel associated jobs
func (s *Service) CleanupTrialBillingIssues(ctx context.Context, orgID string) error {
bite, err := s.DB.FindBillingIssueByType(ctx, orgID, database.BillingIssueTypeTrialEnded)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return fmt.Errorf("failed to find billing issue: %w", err)
}
}

if bite != nil {
err = s.DB.DeleteBillingIssue(ctx, bite.ID)
if err != nil {
return fmt.Errorf("failed to delete billing issue: %w", err)
}
}

biot, err := s.DB.FindBillingIssueByType(ctx, orgID, database.BillingIssueTypeOnTrial)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return fmt.Errorf("failed to find billing issue: %w", err)
}
}

if biot != nil {
err = s.DB.DeleteBillingIssue(ctx, biot.ID)
if err != nil {
return fmt.Errorf("failed to delete billing issue: %w", err)
}
}

return nil
}

// CleanupBillingErrorSubCancellation removes subscription cancellation related billing error and cancel associated job
func (s *Service) CleanupBillingErrorSubCancellation(ctx context.Context, orgID string) error {
bisc, err := s.DB.FindBillingIssueByType(ctx, orgID, database.BillingIssueTypeSubscriptionCancelled)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return fmt.Errorf("failed to find billing errors: %w", err)
}
}

if bisc != nil {
err = s.DB.DeleteBillingIssue(ctx, bisc.ID)
if err != nil {
return fmt.Errorf("failed to delete billing error: %w", err)
}
}

return nil
}

func (s *Service) CheckBillingErrors(ctx context.Context, orgID string) error {
be, err := s.DB.FindBillingIssueByType(ctx, orgID, database.BillingIssueTypeTrialEnded)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return err
}
}

if be != nil {
return fmt.Errorf("trial has ended")
}

be, err = s.DB.FindBillingIssueByType(ctx, orgID, database.BillingIssueTypePaymentFailed)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return err
}
}

if be != nil { // should we allow any grace period here?
return fmt.Errorf("invoice payment failed")
}

be, err = s.DB.FindBillingIssueByType(ctx, orgID, database.BillingIssueTypeSubscriptionCancelled)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return err
}
}

if be != nil && be.Metadata.(*database.BillingIssueMetadataSubscriptionCancelled).EndDate.AddDate(0, 0, 1).After(time.Now()) {
return fmt.Errorf("subscription cancelled")
}

return nil
}

func valOrDefault[T any](ptr *T, def T) T {
if ptr != nil {
return *ptr
Expand Down
46 changes: 38 additions & 8 deletions admin/billing/biller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/rilldata/rill/admin/database"
"github.com/rilldata/rill/admin/jobs"
"github.com/rilldata/rill/runtime/pkg/httputil"
)

const (
Expand All @@ -32,21 +34,29 @@ type Biller interface {
UpdateCustomerPaymentID(ctx context.Context, customerID string, provider PaymentProvider, paymentProviderID string) error
UpdateCustomerEmail(ctx context.Context, customerID, email string) error

// CreateSubscription creates a subscription for the given organization.
// The subscription starts immediately.
// CreateSubscription creates a subscription for the given organization. Subscription starts immediately.
CreateSubscription(ctx context.Context, customerID string, plan *Plan) (*Subscription, error)
// CreateSubscriptionInFuture creates a subscription for the given organization with a start date in the future.
CreateSubscriptionInFuture(ctx context.Context, customerID string, plan *Plan, startDate time.Time) (*Subscription, error)
CancelSubscription(ctx context.Context, subscriptionID string, cancelOption SubscriptionCancellationOption) error
GetSubscriptionsForCustomer(ctx context.Context, customerID string) ([]*Subscription, error)
ChangeSubscriptionPlan(ctx context.Context, subscriptionID string, plan *Plan) (*Subscription, error)
ChangeSubscriptionPlan(ctx context.Context, subscriptionID string, plan *Plan, changeOption SubscriptionChangeOption) (*Subscription, error)
// CancelSubscriptionsForCustomer deletes the subscription for the given organization.
// cancellationDate only applicable if option is SubscriptionCancellationOptionRequestedDate
CancelSubscriptionsForCustomer(ctx context.Context, customerID string, cancelOption SubscriptionCancellationOption) error
FindSubscriptionsPastTrialPeriod(ctx context.Context) ([]*Subscription, error)

GetInvoice(ctx context.Context, invoiceID string) (*Invoice, error)
IsInvoiceValid(ctx context.Context, invoice *Invoice) bool
IsInvoicePaid(ctx context.Context, invoice *Invoice) bool

ReportUsage(ctx context.Context, usage []*Usage) error

GetReportingGranularity() UsageReportingGranularity
GetReportingWorkerCron() string

// WebhookHandlerFunc returns a http.HandlerFunc that can be used to handle incoming webhooks from the payment provider. Return nil if you don't want to register any webhook handlers. jobs is used to enqueue jobs for processing the webhook events.
WebhookHandlerFunc(ctx context.Context, jobs jobs.Client) httputil.Handler
}

type Plan struct {
Expand Down Expand Up @@ -96,11 +106,12 @@ type Subscription struct {
}

type Customer struct {
ID string
Email string
Name string
PaymentProviderID string
PortalURL string
ID string
Email string
Name string
PaymentProviderID string
PortalURL string
HasBillableAddress bool
}

type Usage struct {
Expand All @@ -113,6 +124,18 @@ type Usage struct {
Metadata map[string]interface{}
}

type Invoice struct {
ID string
Status string
CustomerID string
Amount string
Currency string
DueDate time.Time
CreatedAt time.Time
SubscriptionID string
Metadata map[string]interface{}
}

type UsageReportingGranularity string

const (
Expand All @@ -127,6 +150,13 @@ const (
SubscriptionCancellationOptionImmediate
)

type SubscriptionChangeOption int

const (
SubscriptionChangeOptionEndOfSubscriptionTerm SubscriptionChangeOption = iota
SubscriptionChangeOptionImmediate
)

type PaymentProvider string

const (
Expand Down
25 changes: 24 additions & 1 deletion admin/billing/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package billing

import (
"context"
"time"

"github.com/rilldata/rill/admin/database"
"github.com/rilldata/rill/admin/jobs"
"github.com/rilldata/rill/runtime/pkg/httputil"
)

var _ Biller = &noop{}
Expand Down Expand Up @@ -58,6 +61,10 @@ func (n noop) CreateSubscription(ctx context.Context, customerID string, plan *P
return &Subscription{Customer: &Customer{}, Plan: &Plan{Quotas: Quotas{}}}, nil
}

func (n noop) CreateSubscriptionInFuture(ctx context.Context, customerID string, plan *Plan, startDate time.Time) (*Subscription, error) {
return &Subscription{Customer: &Customer{}, Plan: &Plan{Quotas: Quotas{}}}, nil
}

func (n noop) CancelSubscription(ctx context.Context, subscriptionID string, cancelOption SubscriptionCancellationOption) error {
return nil
}
Expand All @@ -66,7 +73,7 @@ func (n noop) GetSubscriptionsForCustomer(ctx context.Context, customerID string
return []*Subscription{{Customer: &Customer{}, Plan: &Plan{Quotas: Quotas{}}}}, nil
}

func (n noop) ChangeSubscriptionPlan(ctx context.Context, subscriptionID string, plan *Plan) (*Subscription, error) {
func (n noop) ChangeSubscriptionPlan(ctx context.Context, subscriptionID string, plan *Plan, changeOption SubscriptionChangeOption) (*Subscription, error) {
return &Subscription{Customer: &Customer{}, Plan: &Plan{Quotas: Quotas{}}}, nil
}

Expand All @@ -78,6 +85,18 @@ func (n noop) FindSubscriptionsPastTrialPeriod(ctx context.Context) ([]*Subscrip
return []*Subscription{}, nil
}

func (n noop) GetInvoice(ctx context.Context, invoiceID string) (*Invoice, error) {
return nil, nil
}

func (n noop) IsInvoiceValid(ctx context.Context, invoice *Invoice) bool {
return true
}

func (n noop) IsInvoicePaid(ctx context.Context, invoice *Invoice) bool {
return true
}

func (n noop) ReportUsage(ctx context.Context, usage []*Usage) error {
return nil
}
Expand All @@ -89,3 +108,7 @@ func (n noop) GetReportingGranularity() UsageReportingGranularity {
func (n noop) GetReportingWorkerCron() string {
return ""
}

func (n noop) WebhookHandlerFunc(ctx context.Context, jc jobs.Client) httputil.Handler {
return nil
}
Loading
Loading