Skip to content

Commit

Permalink
Add extra nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Laura Couto <[email protected]>
  • Loading branch information
lrcouto committed Sep 11, 2024
1 parent eafe4c5 commit c5a1ac3
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 11 deletions.
30 changes: 27 additions & 3 deletions performance-test/conf/base/catalog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ congress_expenses:

expenses_per_party:
type: spark.SparkDataset
filepath: data/output/expenses_per_party.parquet
file_format: parquet
filepath: data/output/expenses_per_party.csv
file_format: csv
save_args:
sep: ','
header: True
mode: overwrite
load_args:
header: True
inferSchema: True

largest_expense_source:
type: spark.SparkDataset
Expand All @@ -26,7 +29,28 @@ largest_expense_source:

top_spender_per_party:
type: spark.SparkDataset
filepath: data/output/top_spender_per_party.parquet
filepath: data/output/top_spender_per_party.csv
file_format: csv
save_args:
sep: ','
header: True
mode: overwrite
load_args:
header: True
inferSchema: True

top_overall_spender:
type: spark.SparkDataset
filepath: data/output/top_overall_spender.parquet
file_format: parquet
save_args:
sep: ','
header: True
mode: overwrite

top_spending_party:
type: spark.SparkDataset
filepath: data/output/top_spending_party.parquet
file_format: parquet
save_args:
sep: ','
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,45 @@

def analyze_expenses_per_party(congress_expenses, parameters):
"""Calculate total expense per party."""
sleep( parameters["dataset_load_delay"])
sleep(parameters["dataset_load_delay"])
sleep(parameters["file_save_delay"])

return congress_expenses.groupBy("sgpartido").agg(
return congress_expenses.groupBy("sgpartido", "txnomeparlamentar").agg(
F.sum("vlrliquido").alias("total_expense")
).orderBy(F.desc("total_expense"))

def find_largest_expense_source(congress_expenses, parameters):
"""Find the largest source of expense."""
sleep( parameters["dataset_load_delay"])
sleep(parameters["dataset_load_delay"])
sleep(parameters["file_save_delay"])

return congress_expenses.groupBy("txtdescricao").agg(
F.sum("vlrliquido").alias("total_expense")
).orderBy(F.desc("total_expense")).limit(1)

def find_top_spender_per_party(congress_expenses, parameters):
def find_top_spender_per_party(expenses_per_party, parameters):
"""Find the top-spending congressman for each party."""
sleep( parameters["dataset_load_delay"])
sleep(parameters["dataset_load_delay"])
sleep(parameters["file_save_delay"])

return congress_expenses.groupBy("sgpartido", "txnomeparlamentar").agg(
F.sum("vlrliquido").alias("total_spent")
return expenses_per_party.groupBy("sgpartido", "txnomeparlamentar").agg(
F.sum("total_expense").alias("total_spent")
).withColumn(
"rank", F.row_number().over(Window.partitionBy("sgpartido").orderBy(F.desc("total_spent")))
).filter(F.col("rank") == 1).drop("rank")

def find_top_overall_spender(top_spender_per_party, parameters):
"""Find the overall top spender across all parties."""
sleep(parameters["dataset_load_delay"])
sleep(parameters["file_save_delay"])

return top_spender_per_party.orderBy(F.desc("total_spent")).limit(1)

def find_top_spending_party(expenses_per_party, parameters):
"""Find the party with the highest total expense."""
sleep(parameters["dataset_load_delay"])
sleep(parameters["file_save_delay"])

return expenses_per_party.groupBy("sgpartido").agg(
F.sum("total_expense").alias("total_party_expense")
).orderBy(F.desc("total_party_expense")).limit(1)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from .nodes import (
analyze_expenses_per_party,
find_largest_expense_source,
find_top_overall_spender,
find_top_spender_per_party,
find_top_spending_party,
)


Expand All @@ -29,9 +31,21 @@ def create_pipeline(**kwargs) -> Pipeline:
),
node(
func=find_top_spender_per_party,
inputs=["congress_expenses", "parameters"],
inputs=["expenses_per_party", "parameters"],
outputs="top_spender_per_party",
name="find_top_spender_per_party_node",
),
node(
func=find_top_overall_spender,
inputs=["top_spender_per_party", "parameters"],
outputs="top_overall_spender",
name="find_top_overall_spender_node",
),
node(
func=find_top_spending_party,
inputs=["expenses_per_party", "parameters"],
outputs="top_spending_party",
name="find_top_spending_party_node",
),
]
)

0 comments on commit c5a1ac3

Please sign in to comment.