lab | ||||
---|---|---|---|---|
|
This module teaches ways to structure the data lake, and to optimize the files for exploration, streaming, and batch workloads. The student will learn how to organize the data lake into levels of data refinement as they transform files through batch and stream processing. Then they will learn how to create indexes on their datasets, such as CSV, JSON, and Parquet files, and use them for potential query and workload acceleration.
In this module, the student will be able to:
- Combine streaming and batch processing with a single pipeline
- Organize the data lake into levels of file transformation
- Index data lake storage for query and workload acceleration
- Module 1 - Explore compute and storage options for data engineering workloads
In this lab, you will use an Azure Databricks workspace and perform Structured Streaming with batch jobs by using Delta Lake. You need to complete the exercises within a Databricks Notebook. To begin, you need to have access to an Azure Databricks workspace.
-
In the Azure portal, navigate to the data-engineering-synapse-xxxxxxx resource group created by the setup script for this course, then select the Azure Databricks workspace.
-
Select Launch Workspace to open your Databricks workspace in a new tab.
-
In the left-hand menu of your Databricks workspace, select Compute.
-
Select Create Cluster to add a new cluster.
-
Enter a name for your cluster, such as
Test Cluster
. -
Select the Databricks RuntimeVersion. We recommend the latest runtime and Scala 2.12.
-
Select the default values for the cluster configuration.
-
Check Spot instances to optimize costs.
-
Select Create Cluster.
-
Wait for the cluster to start. Please note you will have to wait 5 - 7 minutes for the cluster to start up before moving onto the next task.
-
In the Azure Databricks Workspace, in the left pane, select Workspace > Users, and select your username (the entry with the house icon).
-
In the pane that appears, select the arrow next to your name, and select Import.
-
In the Import Notebooks dialog box, select the URL and paste in the following URL:
https://github.com/MicrosoftLearning/DP-203-Data-Engineer/raw/master/Allfiles/microsoft-learning-paths-databricks-notebooks/data-engineering/DBC/11-Delta-Lake-Architecture.dbc
-
Select Import.
-
Select the 11-Delta-Lake-Architecture folder that appears.
-
To enable you to see files being created in the notebook, click the user name in upper right hand corner of the Databricks workspace, and then click Admin Console
-
In the Admin console screen, click Workspace Settings.
-
In Advanced section, enable DBFS File Viewer.
-
In the left pane, select Workspace > Users, and select your username (the entry with the house icon), and click on the 11-Delta-Lake-Architecture folder.
-
Open the 1-Delta-Architecture notebook. Make sure you attach your cluster to the notebook before following the instructions and running the cells within.Within the notebook, you will explore combining streaming and batch processing with a single pipeline.
After you've completed the notebook, return to this screen, and continue to the next lab.
-
In the left pane, select Compute and select your cluster. Then select Terminate to stop the cluster.
This lab demonstrates the experience of working with Apache Spark in Azure Synapse Analytics. You will also learn how to use libraries like Hyperspace and MSSparkUtil to optimize the experience of working with Data Lake storage accounts from Spark notebooks.
After completing the lab, you will understand how to load and make use of Spark libraries in an Azure Synapse Analytics workspace.
When loading data from Azure Data Lake Gen 2, searching in the data is one of the most resource consuming operations. Hyperspace introduces the ability for Apache Spark users to create indexes on their datasets, such as CSV, JSON, and Parquet, and use them for potential query and workload acceleration.
Hyperspace lets you create indexes on records scanned from persisted data files. After they're successfully created, an entry that corresponds to the index is added to the Hyperspace's metadata. This metadata is later used by Apache Spark's optimizer during query processing to find and use proper indexes. If the underlying data changes, you can refresh an existing index to capture that.
Also, Hyperspace allows users to compare their original plan versus the updated index-dependent plan before running their query.
-
Open Synapse Studio (https://web.azuresynapse.net/), and if prompted, select your Azure Active Directory tenant, subscription, and Azure Synapse Analytics workspace.
-
Select the Develop hub.
-
Select +, then Notebook to create a new Synapse notebook.
-
Enter Hyperspace for the notebook name (1), then select the Properties button above (2) to hide the properties pane.
-
Attach the notebook to the Spark cluster and make sure that the language is set to PySpark (Python).
-
Add the following code to a new cell in your notebook:
from hyperspace import *
from com.microsoft.hyperspace import *
from com.microsoft.hyperspace.index import *
# Disable BroadcastHashJoin, so Spark will use standard SortMergeJoin. Currently, Hyperspace indexes utilize SortMergeJoin to speed up query.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# Replace the value below with the name of your primary ADLS Gen2 account for your Synapse workspace
datalake = 'REPLACE_WITH_YOUR_DATALAKE_NAME'
dfSales = spark.read.parquet("abfss://wwi-02@" + datalake + ".dfs.core.windows.net/sale-small/Year=2019/Quarter=Q4/Month=12/*/*.parquet")
dfSales.show(10)
dfCustomers = spark.read.load("abfss://wwi-02@" + datalake + ".dfs.core.windows.net/data-generators/generator-customer-clean.csv", format="csv", header=True)
dfCustomers.show(10)
# Create an instance of Hyperspace
hyperspace = Hyperspace(spark)
Replace the `REPLACE_WITH_YOUR_DATALAKE_NAME` value with the name of your primary ADLS Gen2 account for your Synapse workspace. To find this, do the following:
1. Navigate to the **Data** hub.
![The data hub is highlighted.](images/data-hub.png "Data hub")
2. Select the **Linked** tab **(1)**, expand the Azure Data Lake Storage Gen2 group, then make note of the primary ADLS Gen2 name **(2)** next to the name of the workspace.
![The primary ADLS Gen2 name is displayed.](images/adlsgen2-name.png "ADLS Gen2 name")
-
Run the new cell. It will load the two DataFrames with data from the data lake and initialize Hyperspace.
Note: You may select the Run button to the left of the cell, or enter
Shift+Enter
to execute the cell and create a new cell below.The first time you execute a cell in the notebook will take a few minutes since it must start a new Spark cluster. Each subsequent cell execution should be must faster.
-
Select the + button beneath the cell output, then select </> Code cell to create a new code cell beneath.
-
Paste the following code into the new cell:
#create indexes: each one contains a name, a set of indexed columns and a set of included columns
indexConfigSales = IndexConfig("indexSALES", ["CustomerId"], ["TotalAmount"])
indexConfigCustomers = IndexConfig("indexCUSTOMERS", ["CustomerId"], ["FullName"])
hyperspace.createIndex(dfSales, indexConfigSales) # only create index once
hyperspace.createIndex(dfCustomers, indexConfigCustomers) # only create index once
hyperspace.indexes().show()
-
Run the new cell. It will create two indexes and display their structure.
-
Add another new code cell to your notebook with the following code:
df1 = dfSales.filter("""CustomerId = 6""").select("""TotalAmount""") df1.show() df1.explain(True)
-
Run the new cell. The output will show that the physical execution plan is not taking into account any of the indexes (performs a file scan on the original data file).
-
Now add another new cell to your notebook with the following code (notice the extra line at the beginning used to enable Hyperspace optimization in the Spark engine):
# Enable Hyperspace - Hyperspace optimization rules become visible to the Spark optimizer and exploit existing Hyperspace indexes to optimize user queries
Hyperspace.enable(spark)
df1 = dfSales.filter("""CustomerId = 6""").select("""TotalAmount""")
df1.show()
df1.explain(True)
-
Run the new cell. The output will show that the physical execution plan is now using the index instead of the original data file.
-
Hyperspace provides an Explain API that allows you to compare the execution plans without indexes vs. with indexes. Add a new cell with the following code:
df1 = dfSales.filter("""CustomerId = 6""").select("""TotalAmount""")
spark.conf.set("spark.hyperspace.explain.displayMode", "html")
hyperspace.explain(df1, True, displayHTML)
-
Run the new cell. The output shows a comparison
Plan with indexes
vs.Plan without indexes
. Observe how, in the first case the index file is used while in the second case the original data file is used. -
Let's investigate now a more complex case, involving a join operation. Add a new cell with the following code:
eqJoin = dfSales.join(dfCustomers, dfSales.CustomerId == dfCustomers.CustomerId).select(dfSales.TotalAmount, dfCustomers.FullName)
hyperspace.explain(eqJoin, True, displayHTML)
-
Run the new cell. The output shows again a comparison
Plan with indexes
vs.Plan without indexes
, where indexes are used in the first case and the original data files in the second.In case you want to deactivate Hyperspace and cleanup the indexes, you can run the following code:
# Disable Hyperspace - Hyperspace rules no longer apply during query optimization. Disabling Hyperspace has no impact on created indexes because they remain intact
Hyperspace.disable(spark)
hyperspace.deleteIndex("indexSALES")
hyperspace.vacuumIndex("indexSALES")
hyperspace.deleteIndex("indexCUSTOMERS")
hyperspace.vacuumIndex("indexCUSTOMERS")
Microsoft Spark Utilities (MSSparkUtils) is a builtin package to help you easily perform common tasks. You can use MSSparkUtils to work with file systems, to get environment variables, and to work with secrets.
- Continue with the same notebook from the previous task and add a new cell with the following code:
from notebookutils import mssparkutils
#
# Microsoft Spark Utilities
#
# https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/microsoft-spark-utilities?pivots=programming-language-python
#
# Azure storage access info
blob_account_name = datalake
blob_container_name = 'wwi-02'
blob_relative_path = '/'
linkedServiceName = datalake
blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linkedServiceName)
# Allow SPARK to access from Blob remotely
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
files = mssparkutils.fs.ls('/')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
mssparkutils.fs.mkdirs('/SomeNewFolder')
files = mssparkutils.fs.ls('/')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
- Run the new cell and observe how
mssparkutils
is used to work with the file system.
To learn more about the topics covered in this lab, use these resources:
- Apache Spark in Azure Synapse Analytics
- Announcing Azure Data Explorer data connector for Azure Synapse
- Connect to Azure Data Explorer using Apache Spark for Azure Synapse Analytics
- Azure Synapse Analytics shared metadata
- Introduction of Microsoft Spark Utilities
- Hyperspace - An open source indexing subsystem that brings index-based query acceleration to Apache Spark™ and big data workloads