Skip to content

Commit

Permalink
Updates to the stripe invoice generator:
Browse files Browse the repository at this point in the history
* For tenants that do not have a credit card:
   * Skip generating invoices for tenants that have no data flowing in the billed month
   * Skip generating invoices for tenants that didn't have both an active capture _and_ an active materialization in the billed month
* Add stub stripe customers table to allow sqlx to properly infer query shapes
* Automatically remove existing draft invoices if a new rule has been introduced that would have skipped that invoice
  • Loading branch information
jshearer committed Dec 20, 2023
1 parent f8fe482 commit c84db38
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 38 deletions.
186 changes: 148 additions & 38 deletions crates/billing-integrations/src/stripe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sqlx::Postgres;
use sqlx::{postgres::PgPoolOptions, types::chrono::NaiveDate, Pool};
use std::collections::HashMap;
use stripe::List;
use stripe::{InvoiceStatus, List};

const TENANT_METADATA_KEY: &str = "estuary.dev/tenant_name";
const CREATED_BY_BILLING_AUTOMATION: &str = "estuary.dev/created_by_automation";
Expand Down Expand Up @@ -109,9 +109,10 @@ enum InvoiceResult {
Created,
Updated,
LessThanMinimum,
NoUsage,
FreeTier,
FutureTrialStart,
NoDataMoved,
NoFullPipeline,
Error,
}

Expand All @@ -123,11 +124,14 @@ impl InvoiceResult {
InvoiceResult::LessThanMinimum => {
"Skipping invoice for less than the minimum chargable amount ($0.50)"
}
InvoiceResult::NoUsage => "Skipping invoice with no usage",
InvoiceResult::FreeTier => "Skipping usage invoice for tenant in free tier",
InvoiceResult::FutureTrialStart => {
"Skipping invoice ending before free trial start date"
}
InvoiceResult::NoDataMoved => "Skipping invoice for tenant with no data movement",
InvoiceResult::NoFullPipeline => {
"Skipping invoice for tenant without an active pipeline"
}
InvoiceResult::Error => "Error publishing invoices",
}
}
Expand All @@ -142,9 +146,48 @@ struct Invoice {
billed_prefix: String,
invoice_type: InvoiceType,
extra: Option<sqlx::types::Json<Option<Extra>>>,
has_payment_method: Option<bool>,
capture_hours: Option<f64>,
materialization_hours: Option<f64>,
}

impl Invoice {
pub async fn get_stripe_invoice(
&self,
client: &stripe::Client,
customer_id: &str,
) -> anyhow::Result<Option<stripe::Invoice>> {
let date_start_repr = self.date_start.format("%F").to_string();
let date_end_repr = self.date_end.format("%F").to_string();

let invoice_type_val =
serde_json::to_value(self.invoice_type.clone()).expect("InvoiceType is serializable");
let invoice_type_str = invoice_type_val
.as_str()
.expect("InvoiceType is serializable");

let invoice_search = stripe_search::<stripe::Invoice>(
&client,
"invoices",
SearchParams {
query: format!(
r#"
-status:"deleted" AND
customer:"{customer_id}" AND
metadata["{INVOICE_TYPE_KEY}"]:"{invoice_type_str}" AND
metadata["{BILLING_PERIOD_START_KEY}"]:"{date_start_repr}" AND
metadata["{BILLING_PERIOD_END_KEY}"]:"{date_end_repr}"
"#
),
..Default::default()
},
)
.await
.context("Searching for an invoice")?;

Ok(invoice_search.data.into_iter().next())
}

#[tracing::instrument(skip(self, client, db_client), fields(tenant=self.billed_prefix, invoice_type=format!("{:?}",self.invoice_type), subtotal=format!("${:.2}", self.subtotal as f64 / 100.0)))]
pub async fn upsert_invoice(
&self,
Expand All @@ -157,16 +200,18 @@ impl Invoice {
(InvoiceType::Preview, _) => {
bail!("Should not create Stripe invoices for preview invoices")
}
(InvoiceType::Final, Some(extra)) => {
(InvoiceType::Final, Some(extra)) if !self.has_payment_method.unwrap_or(false) => {
let unwrapped_extra = extra.clone().0.expect(
"This is just a sqlx quirk, if the outer Option is Some then this will be Some",
);
if !(unwrapped_extra.recurring_fee.unwrap_or(0) > 0
|| unwrapped_extra.processed_data_gb.unwrap_or(0.0) > 0.0
|| unwrapped_extra.task_usage_hours.unwrap_or(0.0) > 0.0
|| self.subtotal > 0)
if unwrapped_extra.processed_data_gb.unwrap_or_default() == 0.0 {
return Ok(InvoiceResult::NoDataMoved);
}

if self.capture_hours.unwrap_or_default() == 0.0
|| self.materialization_hours.unwrap_or_default() == 0.0
{
return Ok(InvoiceResult::NoUsage);
return Ok(InvoiceResult::NoFullPipeline);
}
}
(InvoiceType::Final, None) => {
Expand Down Expand Up @@ -226,31 +271,20 @@ impl Invoice {
.as_str()
.expect("InvoiceType is serializable");

let customer =
get_or_create_customer_for_tenant(client, db_client, self.billed_prefix.to_owned())
.await?;
let customer_id = customer.id.to_string();

let invoice_search = stripe_search::<stripe::Invoice>(
let customer = get_or_create_customer_for_tenant(
client,
"invoices",
SearchParams {
query: format!(
r#"
-status:"deleted" AND
customer:"{customer_id}" AND
metadata["{INVOICE_TYPE_KEY}"]:"{invoice_type_str}" AND
metadata["{BILLING_PERIOD_START_KEY}"]:"{date_start_repr}" AND
metadata["{BILLING_PERIOD_END_KEY}"]:"{date_end_repr}"
"#
),
..Default::default()
},
db_client,
self.billed_prefix.to_owned(),
true,
)
.await
.context("Searching for an invoice")?;
.await?
.expect("Should never return None");
let customer_id = customer.id.to_string();

let maybe_invoice = if let Some(invoice) = invoice_search.data.into_iter().next() {
let maybe_invoice = if let Some(invoice) = self
.get_stripe_invoice(&client, customer_id.as_str())
.await?
{
match invoice.status {
Some(state @ (stripe::InvoiceStatus::Open | stripe::InvoiceStatus::Draft))
if recreate_finalized =>
Expand Down Expand Up @@ -435,8 +469,28 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> {
invoice_type as "invoice_type!: InvoiceType",
line_items as "line_items!: sqlx::types::Json<Vec<LineItem>>",
subtotal::bigint as "subtotal!",
extra as "extra: sqlx::types::Json<Option<Extra>>"
extra as "extra: sqlx::types::Json<Option<Extra>>",
customer.has_payment_method as has_payment_method,
dataflow_hours.capture_hours::float as capture_hours,
dataflow_hours.materialization_hours::float as materialization_hours
from invoices_ext
inner join lateral(
select bool_or("invoice_settings/default_payment_method" is not null) as has_payment_method
from stripe.customers
where customers.metadata->>'estuary.dev/tenant_name' = billed_prefix
group by billed_prefix
) as customer on true
inner join lateral(
select
sum(catalog_stats.usage_seconds) filter (where live_specs.spec_type = 'capture') / (60.0 * 60) as capture_hours,
sum(catalog_stats.usage_seconds) filter (where live_specs.spec_type = 'materialization') / (60.0 * 60) as materialization_hours
from catalog_stats
join live_specs on live_specs.catalog_name = catalog_stats.catalog_name
where
catalog_stats.catalog_name ^@ billed_prefix
and grain = 'monthly'
and tstzrange(date_trunc('day', $1::date), date_trunc('day', ($1::date)) + interval '1 month' - interval '1 day') @> catalog_stats.ts
) as dataflow_hours on true
where ((
date_start >= date_trunc('day', $1::date)
and date_end <= date_trunc('day', ($1::date)) + interval '1 month' - interval '1 day'
Expand All @@ -462,8 +516,28 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> {
invoice_type as "invoice_type!: InvoiceType",
line_items as "line_items!: sqlx::types::Json<Vec<LineItem>>",
subtotal::bigint as "subtotal!",
extra as "extra: sqlx::types::Json<Option<Extra>>"
extra as "extra: sqlx::types::Json<Option<Extra>>",
customer.has_payment_method as has_payment_method,
dataflow_hours.capture_hours::float as capture_hours,
dataflow_hours.materialization_hours::float as materialization_hours
from invoices_ext
inner join lateral(
select bool_or("invoice_settings/default_payment_method" is not null) as has_payment_method
from stripe.customers
where customers.metadata->>'estuary.dev/tenant_name' = billed_prefix
group by billed_prefix
) as customer on true
inner join lateral(
select
sum(catalog_stats.usage_seconds) filter (where live_specs.spec_type = 'capture') / (60.0 * 60) as capture_hours,
sum(catalog_stats.usage_seconds) filter (where live_specs.spec_type = 'materialization') / (60.0 * 60) as materialization_hours
from catalog_stats
join live_specs on live_specs.catalog_name = catalog_stats.catalog_name
where
catalog_stats.catalog_name ^@ billed_prefix
and grain = 'monthly'
and tstzrange(date_trunc('day', $1::date), date_trunc('day', ($1::date)) + interval '1 month' - interval '1 day') @> catalog_stats.ts
) as dataflow_hours on true
where (
date_start >= date_trunc('day', $1::date)
and date_end <= date_trunc('day', ($1::date)) + interval '1 month' - interval '1 day'
Expand Down Expand Up @@ -520,6 +594,39 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> {
"{}",
res.message()
);
match res {
InvoiceResult::Created
| InvoiceResult::Updated
| InvoiceResult::Error => {}
// Remove any incorrectly created invoices that are now skipped for whatever reason
_ => {
let customer = match get_or_create_customer_for_tenant(
&client,
&db_pool,
response.billed_prefix.to_owned(),
false,
)
.await?
{
Some(c) => c,
None => return Ok((res, response.subtotal)),
};

let customer_id = customer.id.to_string();

if let Some(invoice) =
response.get_stripe_invoice(&client, &customer_id).await?
{
if let Some(InvoiceStatus::Draft) = invoice.status {
tracing::warn!(
tenant = response.billed_prefix.to_string(),
"Deleting draft invoice!"
);
stripe::Invoice::delete(&client, &invoice.id).await?;
}
}
}
}
Ok((res, response.subtotal))
}
}
Expand Down Expand Up @@ -578,12 +685,13 @@ async fn get_tenant_trial_date(
Ok(query_result.trial_start)
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip(client, db_client))]
async fn get_or_create_customer_for_tenant(
client: &stripe::Client,
db_client: &Pool<Postgres>,
tenant: String,
) -> anyhow::Result<stripe::Customer> {
create: bool,
) -> anyhow::Result<Option<stripe::Customer>> {
let customers = stripe_search::<stripe::Customer>(
client,
"customers",
Expand All @@ -598,7 +706,7 @@ async fn get_or_create_customer_for_tenant(
let customer = if let Some(customer) = customers.data.into_iter().next() {
tracing::debug!("Found existing customer {id}", id = customer.id.to_string());
customer
} else {
} else if create {
tracing::debug!("Creating new customer");
let new_customer = stripe::Customer::create(
client,
Expand All @@ -620,6 +728,8 @@ async fn get_or_create_customer_for_tenant(
.await?;

new_customer
} else {
return Ok(None);
};

if customer.email.is_none() {
Expand Down Expand Up @@ -652,8 +762,8 @@ async fn get_or_create_customer_for_tenant(
)
.await?;
} else {
bail!("Stripe customer object is missing an email. No admins found for that tenant, unable to create invoice without email. Found users: {found:?} Skipping", found=responses);
bail!("Stripe customer object is missing an email. No admins found for tenant {tenant}, unable to create invoice without email. Found users: {found:?} Skipping", found=responses, tenant=tenant);
}
}
Ok(customer)
Ok(Some(customer))
}
33 changes: 33 additions & 0 deletions supabase/migrations/42_stripe_customers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
begin;

create schema stripe;
grant usage on schema stripe to postgres;

-- Included here to match the shape of production database so that sqlx can infer queries properly.
create table stripe.customers (
id text PRIMARY KEY,
address json,
"address/city" text,
"address/country" text,
"address/line1" text,
"address/line2" text,
"address/postal_code" text,
"address/state" text,
balance bigint,
created bigint,
currency text,
default_source text,
delinquent boolean,
description text,
email text,
invoice_prefix text,
invoice_settings json,
"invoice_settings/custom_fields" json,
"invoice_settings/default_payment_method" text,
metadata json,
name text,
phone text,
flow_document json NOT NULL
);

end;

0 comments on commit c84db38

Please sign in to comment.