Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: test the snowflake query for populate_product_catalog_command #4451

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions course_discovery/apps/course_metadata/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,189 @@ class PathwayType(Enum):
from
prod.enterprise.course_reviews
'''

COURSE_TYPES = {
'ocm_course' : ['audit', 'verified-audit', 'verified', 'credit-verified-audit', 'spoc-verified-audit', 'professional'],
'executive_education' : ['executive-education-2u'],
'bootcamp' : ['bootcamp-2u'],
}

SNOWFLAKE_POPULATE_PRODUCT_COURSES_CATALOG_QUERY = """
WITH course_data AS (
SELECT
c.id, c.uuid as COURSE_UUID,
c.key as COURSE_KEY,
cr.key as COURSERUN_KEY,
c.title AS COURSE_TITLE,
coursetype.name AS COURSE_TYPE,
COUNT(DISTINCT org.id) AS ORGANIZATIONS_COUNT,
LISTAGG(DISTINCT org.key, ', ') AS ORGANISATION_ABBR,
LISTAGG(DISTINCT org.name, ', ') AS ORGANIZATION_NAME,
LISTAGG(DISTINCT CONCAT('{DISCOVERY_CDN_URL}', org.logo_image), ', ') AS ORGANIZATION_LOGO,
COUNT(DISTINCT cr.language_id) AS languagesCount,
LISTAGG(DISTINCT cr.language_id, ', ') AS Languages,
LISTAGG(DISTINCT CASE WHEN st.language_code <> 'es' THEN st.name ELSE NULL END, ', ') AS Subjects,
LISTAGG(DISTINCT CASE WHEN st.language_code = 'es' THEN st.name ELSE NULL END, ', ') AS Subject_Spanish,
LISTAGG(DISTINCT s.type, ', ') AS SEAT_TYPE,
CONCAT(p.marketing_site_url_root, cslug.url_Slug) AS MARKETING_URL,
CASE
WHEN c.image IS NOT NULL THEN CONCAT('{DISCOVERY_CDN_URL}', c.image)
ELSE CONCAT('{DISCOVERY_CDN_URL}', c.card_image_url)
END AS MARKETING_IMAGE,
CASE
WHEN cr.RUN_START IS NOT NULL AND cr.RUN_START >= CURRENT_TIMESTAMP() THEN 'True'
ELSE 'False'
END AS is_upcoming,
CASE
WHEN (cr.enrollment_end IS NULL OR cr.enrollment_end >= CURRENT_TIMESTAMP()) AND (cr.enrollment_start IS NULL OR cr.enrollment_start <= CURRENT_TIMESTAMP()) THEN 'True'
ELSE 'False'
END AS is_enrollable,
CASE
WHEN crt.is_marketable = TRUE
AND cr.draft = FALSE
AND cr.status = 'published'
AND s.id IS NOT NULL -- Checking if there are any seats
AND (cr.slug IS NOT NULL AND cr.slug != '' AND crt.is_marketable = TRUE) -- is_marketable
THEN 'True'
ELSE 'False'
END AS is_marketable,
CASE
WHEN cr.RUN_END IS NOT NULL AND cr.RUN_END < CURRENT_TIMESTAMP() THEN 'True'
ELSE 'False'
END AS has_ended,
LISTAGG(DISTINCT
CASE
WHEN cr.RUN_END < CURRENT_TIMESTAMP() THEN 'Archived'
WHEN cr.RUN_START <= CURRENT_TIMESTAMP() THEN 'Current'
WHEN cr.RUN_START < DATEADD(DAY, 60, CURRENT_TIMESTAMP()) THEN 'Starting Soon'
ELSE 'Upcoming'
END, ', ') AS availability_status,
LISTAGG(cr.pacing_type, ', ') AS pacing
FROM
discovery.course_metadata_courserun AS cr
JOIN
discovery.course_metadata_course AS c ON c.id = cr.course_id
JOIN
discovery.core_partner AS p ON c.partner_id = p.id
JOIN
discovery.course_metadata_courseruntype AS crt ON crt.id = cr.type_id
JOIN
discovery.course_metadata_seat AS s ON cr.id = s.course_run_id
JOIN
discovery.course_metadata_coursetype AS coursetype ON coursetype.id = c.type_id
JOIN
discovery.course_metadata_course_authoring_organizations AS cao ON c.id = cao.course_id
JOIN
discovery.course_metadata_organization AS org ON cao.organization_id = org.id
JOIN
discovery.course_metadata_course_subjects AS cs ON c.id = cs.course_id
JOIN
discovery.course_metadata_subject AS sb ON sb.id = cs.subject_id
JOIN
discovery.course_metadata_subjecttranslation AS st ON st.master_id = sb.id
JOIN
discovery.course_metadata_courseurlslug AS cslug ON c.id = cslug.course_id
JOIN
discovery.course_metadata_source as product_source on c.product_source_id = product_source.id
WHERE
c.draft != 1 AND cr.hidden != 1 AND cr.status = 'published'
AND coursetype.slug IN ( {course_types} )
AND cslug.is_active = 1 {product_source_filter}
GROUP BY
c.uuid, c.id, c.key, cr.key, c.title, coursetype.name, p.marketing_site_url_root, cslug.url_Slug, c.image, c.card_image_url, cr.RUN_START, cr.enrollment_end, cr.enrollment_start, cr.RUN_END, crt.is_marketable, cr.draft, cr.status, s.id, cr.slug
ORDER BY
c.id
)
SELECT DISTINCT COURSE_UUID, COURSE_TITLE, ORGANIZATION_NAME, ORGANIZATION_LOGO, ORGANISATION_ABBR, Languages, Subjects, Subject_Spanish, MARKETING_URL, MARKETING_IMAGE, COURSE_TYPE
FROM course_data
WHERE
(is_upcoming = 'True')
OR (is_enrollable = 'True' AND has_ended != 'True' AND is_marketable = 'True');
"""

SNOWFLAKE_POPULATE_PRODUCT_DEGREES_CATALOG_QUERY = """
SELECT
cmp.uuid,
cmp.title,
COALESCE(
LISTAGG(DISTINCT cmo.name, ', '),
''
) AS authoring_organizations,
COALESCE(
LISTAGG(DISTINCT CONCAT('{DISCOVERY_CDN_URL}', cmo.logo_image), ', '), ''
) AS authoring_organizations_logo,
COALESCE(
LISTAGG(DISTINCT cmo.key, ', '), ''
) AS authoring_organizations_abbr,
CASE
-- First, check if there is a language override and it's not empty
WHEN cmp.language_override_id IS NOT NULL AND cmp.language_override_id <> ''
THEN cmp.language_override_id
-- If no override, check if there are course run languages
WHEN COUNT(DISTINCT cr.language_id) > 0
THEN LISTAGG(DISTINCT cr.language_id, ', ')
-- Default value if no language is found
ELSE 'en-us'
END AS languages,
LISTAGG(DISTINCT CASE WHEN st.language_code <> 'es' THEN st.name ELSE NULL END, ', ') AS Primary_Subject,
LISTAGG(DISTINCT CASE WHEN st.language_code = 'es' THEN st.name ELSE NULL END, ', ') AS Primary_Subject_Spanish,
CASE
WHEN POSITION('/', cmp.marketing_slug) = 0 THEN CONCAT(
cp.marketing_site_url_root,
cmpt.slug,
'/',
cmp.marketing_slug
)
ELSE CONCAT(cp.marketing_site_url_root, cmp.marketing_slug)
END AS marketing_url,
CASE
WHEN cmp.card_image = '' OR cmp.card_image IS NULL
THEN ''
ELSE CONCAT('{DISCOVERY_CDN_URL}', cmp.card_image)
END AS MARKETING_IMAGE,
CONCAT_WS(
', ',
cms_primary.slug,
LISTAGG(DISTINCT cms.slug, ', ')
) AS subjects,
CASE
WHEN cmd.program_ptr_id IS NULL THEN 0
ELSE 1
END AS is_degree,
cmp.status,

FROM discovery.course_metadata_program cmp
INNER JOIN discovery.core_partner cp ON cp.id = cmp.partner_id
INNER JOIN discovery.course_metadata_programtype cmpt ON cmpt.id = cmp.type_id
LEFT JOIN discovery.course_metadata_degree cmd ON cmp.id = cmd.program_ptr_id
LEFT JOIN discovery.course_metadata_degreeadditionalmetadata cmdam ON cmd.program_ptr_id = cmdam.degree_id
LEFT JOIN discovery.course_metadata_program_authoring_organizations cmpao ON cmpao.program_id = cmp.id
LEFT JOIN discovery.course_metadata_organization cmo ON cmo.id = cmpao.organization_id
LEFT JOIN discovery.course_metadata_program_courses cmpc ON cmpc.program_id = cmp.id
LEFT JOIN discovery.course_metadata_course cmc ON cmc.id = cmpc.course_id
LEFT JOIN COURSE_METADATA_courserun cr ON cr.course_id = cmc.id
LEFT JOIN discovery.course_metadata_course_subjects cmcs ON cmcs.course_id = cmc.id
LEFT JOIN discovery.course_metadata_subject cms ON cms.id = cmcs.subject_id
LEFT JOIN discovery.course_metadata_subject cms_primary ON cms_primary.id = cmp.primary_subject_override_id
LEFT JOIN discovery.course_metadata_subjecttranslation AS st ON st.master_id = cmp.primary_subject_override_id or st.master_id = cmcs.subject_id
LEFT JOIN discovery.course_metadata_source as product_source on product_source.id = cmp.product_source_id
WHERE cmp.partner_id = 1 and cmp.status = 'active' {product_source_filter}
GROUP BY
cmp.uuid,
cmp.title,
cmp.status,
cmp.marketing_slug,
cmpt.slug,
cp.name,
cmd.program_ptr_id,
cp.marketing_site_url_root,
cms_primary.slug, -- Added cms_primary.slug to the GROUP BY clause
cmp.language_override_id,
cmp.card_image
HAVING
IS_DEGREE = 1;
"""
SNOWFLAKE_POPULATE_PRODUCT_CATALOG_QUERY = {
'course' : SNOWFLAKE_POPULATE_PRODUCT_COURSES_CATALOG_QUERY,
'degree': SNOWFLAKE_POPULATE_PRODUCT_DEGREES_CATALOG_QUERY,
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import datetime
import logging

import snowflake.connector
from django.conf import settings
from django.core.management import BaseCommand, CommandError
from django.db.models import Count, Prefetch, Q

from course_discovery.apps.course_metadata.constants import (
SNOWFLAKE_POPULATE_COURSE_LENGTH_QUERY, SNOWFLAKE_POPULATE_PRODUCT_CATALOG_QUERY, COURSE_TYPES
)
from course_discovery.apps.course_metadata.gspread_client import GspreadClient
from course_discovery.apps.course_metadata.models import Course, CourseType, Program, SubjectTranslation

Expand Down Expand Up @@ -63,6 +67,65 @@ def add_arguments(self, parser):
required=False,
help='Flag to overwrite the existing data in Google Sheet tab'
)
parser.add_argument(
'--use_snowflake',
dest='use_snowflake_flag',
type=bool,
default=False,
required=False,
help='Flag to use Snowflake for fetching data'
)
def get_products_via_snowflake(self, product_type='ocm_course', product_source=None):
"""
Fetch products from Snowflake for product catalog
"""
snowflake_client = snowflake.connector.connect(
user=settings.SNOWFLAKE_SERVICE_USER,
password=settings.SNOWFLAKE_SERVICE_USER_PASSWORD,
account='edx.us-east-1',
database='prod'
)
cs = snowflake_client.cursor()
course_types = ', '.join(f"'{ct}'" for ct in COURSE_TYPES.get(product_type, []))
product_source_list = product_source.split(',') if product_source else []
product_source_filter = (
f"AND product_source.slug in ({', '.join(map(repr, product_source_list))})"
if product_source_list else ''
)
query_type = 'course' if product_type in ['executive_education', 'bootcamp', 'ocm_course'] else 'degree'
rows = []
try:
query = SNOWFLAKE_POPULATE_PRODUCT_CATALOG_QUERY[query_type].format(
course_types=course_types,
product_source_filter=product_source_filter,
DISCOVERY_CDN_URL=settings.DISCOVERY_CDN_URL
)
cs.execute(query)
rows = cs.fetchall()
except Exception as e:
logger.error('Error while fetching products from Snowflake for product catalog: %s', str(e))
finally:
cs.close()
snowflake_client.close()
return rows

def get_transformed_data_from_snowflake(self, product):
"""
Transform data fetched from Snowflake for product catalog
"""
transformed_data = {
'UUID': product[0],
'Title': product[1],
'Organizations Name': product[2],
'Organizations Logo': product[3],
'Organizations Abbr': product[4],
'Languages': product[5],
'Subjects': product[6],
'Subjects Spanish': product[7],
'Marketing URL': product[8],
'Marketing Image': product[9],
}
return transformed_data

def get_products(self, product_type, product_source):
"""
Expand Down Expand Up @@ -182,38 +245,58 @@ def handle(self, *args, **options):
product_source = options.get('product_source')
gspread_client_flag = options.get('gspread_client_flag')
overwrite = options.get('overwrite_flag')
snowflake_flag = options.get('use_snowflake_flag')
PRODUCT_CATALOG_CONFIG = {
'SHEET_ID': settings.PRODUCT_CATALOG_SHEET_ID,
'OUTPUT_TAB_ID': (
product_type.upper() + ('_' + datetime.datetime.now().strftime("%Y%m%d") if not overwrite else '')
if product_type else 'All'
product_type.upper() + ('_' + datetime.datetime.now().strftime("%Y%m%d") if not overwrite else '') +
('__SNOWFLAKE' if snowflake_flag else '') if product_type else 'All'
),
}

gspread_client = GspreadClient()

try:
product_type = product_type.lower()
products = self.get_products(product_type, product_source)
if not products.exists():
raise CommandError('No products found for the given criteria.')
products_count = products.count()

if snowflake_flag:
products = self.get_products_via_snowflake(product_type, product_source)
if not products:
raise CommandError('No products found for the given criteria.')
products_count = len(products)

else:
products = self.get_products(product_type, product_source)
if not products.exists():
raise CommandError('No products found for the given criteria.')
products_count = products.count()

logger.info(f'Fetched {products_count} {product_type}s from the database')

if output_csv:
with open(output_csv, 'w', newline='') as output_file:
output_writer = self.write_csv_header(output_file)
for product in products:
try:
output_writer.writerow(self.get_transformed_data(product, product_type))
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Error writing product {product.uuid} to CSV: {str(e)}")
continue

if snowflake_flag:
for row in products:
transformed_data = self.get_transformed_data_from_snowflake(row)
output_writer.writerow(transformed_data)

else:
for product in products:
try:
output_writer.writerow(self.get_transformed_data(product, product_type))
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Error writing product {product.uuid} to CSV: {str(e)}")
continue

logger.info(f'Populated {products_count} {product_type}s to {output_csv}')

elif gspread_client_flag:
csv_data = [self.get_transformed_data(product, product_type) for product in products]
if snowflake_flag:
csv_data = [self.get_transformed_data_from_snowflake(row) for row in products]
else:
csv_data = [self.get_transformed_data(product, product_type) for product in products]
gspread_client.write_data(
PRODUCT_CATALOG_CONFIG,
self.CATALOG_CSV_HEADERS,
Expand Down
1 change: 1 addition & 0 deletions course_discovery/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@
DEFAULT_PRODUCT_SOURCE_NAME = 'edX'
DEFAULT_PRODUCT_SOURCE_SLUG = 'edx'
EXTERNAL_PRODUCT_SOURCE_SLUG = ''
DISCOVERY_CDN_URL = ''

CONTENTFUL_SPACE_ID = None
CONTENTFUL_CONTENT_DELIVERY_API_KEY = None
Expand Down
Loading