Skip to content

Commit

Permalink
Added the DE files created from the original markdown files
Browse files Browse the repository at this point in the history
These were updated by @afelix-95 (thanks!) during testing.
JH
  • Loading branch information
TheJamesHerring authored Jul 25, 2024
1 parent 1f9a6a8 commit 3a08f2d
Show file tree
Hide file tree
Showing 6 changed files with 1,113 additions and 0 deletions.
184 changes: 184 additions & 0 deletions Instructions/Exercises/DE-01-Real-time-ingestion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
---
lab:
title: 'Real-time Ingestion and Processing with Spark Structured Streaming and Delta Lake with Azure Databricks'
---

# Real-time Ingestion and Processing with Spark Structured Streaming and Delta Lake with Azure Databricks

Spark Structured Streaming allows you to process data in real-time with end-to-end fault tolerance. Delta Lake enhances this by providing a storage layer with ACID transactions, ensuring data integrity and consistency. You can ingest data from cloud storage into Delta Lake, and use Delta Live Tables to manage and optimize your streaming data pipelines.

This lab will take approximately **30** minutes to complete.

## Provision an Azure Databricks workspace

> **Tip**: If you already have an Azure Databricks workspace, you can skip this procedure and use your existing workspace.
This exercise includes a script to provision a new Azure Databricks workspace. The script attempts to create a *Premium* tier Azure Databricks workspace resource in a region in which your Azure subscription has sufficient quota for the compute cores required in this exercise; and assumes your user account has sufficient permissions in the subscription to create an Azure Databricks workspace resource. If the script fails due to insufficient quota or permissions, you can try to [create an Azure Databricks workspace interactively in the Azure portal](https://learn.microsoft.com/azure/databricks/getting-started/#--create-an-azure-databricks-workspace).

1. In a web browser, sign into the [Azure portal](https://portal.azure.com) at `https://portal.azure.com`.

2. Use the **[\>_]** button to the right of the search bar at the top of the page to create a new Cloud Shell in the Azure portal, selecting a ***PowerShell*** environment and creating storage if prompted. The cloud shell provides a command line interface in a pane at the bottom of the Azure portal, as shown here:

![Azure portal with a cloud shell pane](./images/cloud-shell.png)

> **Note**: If you have previously created a cloud shell that uses a *Bash* environment, use the the drop-down menu at the top left of the cloud shell pane to change it to ***PowerShell***.
3. Note that you can resize the cloud shell by dragging the separator bar at the top of the pane, or by using the **—**, **◻**, and **X** icons at the top right of the pane to minimize, maximize, and close the pane. For more information about using the Azure Cloud Shell, see the [Azure Cloud Shell documentation](https://docs.microsoft.com/azure/cloud-shell/overview).

4. In the PowerShell pane, enter the following commands to clone this repo:

```powershell
rm -r mslearn-databricks -f
git clone https://github.com/MicrosoftLearning/mslearn-databricks
```

5. After the repo has been cloned, enter the following command to run the **setup.ps1** script, which provisions an Azure Databricks workspace in an available region:

```powershell
./mslearn-databricks/setup.ps1
```

6. If prompted, choose which subscription you want to use (this will only happen if you have access to multiple Azure subscriptions).

7. Wait for the script to complete - this typically takes around 5 minutes, but in some cases may take longer. While you are waiting, review the [Introduction to Delta Lake](https://docs.microsoft.com/azure/databricks/delta/delta-intro) article in the Azure Databricks documentation.

## Create a cluster

Azure Databricks is a distributed processing platform that uses Apache Spark *clusters* to process data in parallel on multiple nodes. Each cluster consists of a driver node to coordinate the work, and worker nodes to perform processing tasks. In this exercise, you'll create a *single-node* cluster to minimize the compute resources used in the lab environment (in which resources may be constrained). In a production environment, you'd typically create a cluster with multiple worker nodes.

> **Tip**: If you already have a cluster with a 13.3 LTS or higher runtime version in your Azure Databricks workspace, you can use it to complete this exercise and skip this procedure.
1. In the Azure portal, browse to the **msl-*xxxxxxx*** resource group that was created by the script (or the resource group containing your existing Azure Databricks workspace)

1. Select your Azure Databricks Service resource (named **databricks-*xxxxxxx*** if you used the setup script to create it).

1. In the **Overview** page for your workspace, use the **Launch Workspace** button to open your Azure Databricks workspace in a new browser tab; signing in if prompted.

> **Tip**: As you use the Databricks Workspace portal, various tips and notifications may be displayed. Dismiss these and follow the instructions provided to complete the tasks in this exercise.
1. In the sidebar on the left, select the **(+) New** task, and then select **Cluster**.

1. In the **New Cluster** page, create a new cluster with the following settings:
- **Cluster name**: *User Name's* cluster (the default cluster name)
- **Policy**: Unrestricted
- **Cluster mode**: Single Node
- **Access mode**: Single user (*with your user account selected*)
- **Databricks runtime version**: 13.3 LTS (Spark 3.4.1, Scala 2.12) or later
- **Use Photon Acceleration**: Selected
- **Node type**: Standard_DS3_v2
- **Terminate after** *20* **minutes of inactivity**

1. Wait for the cluster to be created. It may take a minute or two.

> **Note**: If your cluster fails to start, your subscription may have insufficient quota in the region where your Azure Databricks workspace is provisioned. See [CPU core limit prevents cluster creation](https://docs.microsoft.com/azure/databricks/kb/clusters/azure-core-limit) for details. If this happens, you can try deleting your workspace and creating a new one in a different region. You can specify a region as a parameter for the setup script like this: `./mslearn-databricks/setup.ps1 eastus`
## Create a notebook and ingest data

You can create notebooks in your Azure Databricks workspace to run code written in a range of programming languages. In this exercise, you'll create a simple notebook that ingests data from a file and saves it in a folder in Databricks File System (DBFS).

1. View the Azure Databricks workspace portal and note that the sidebar on the left side contains icons for the various tasks you can perform.

1. In the sidebar, use the **(+) New** link to create a **Notebook**.

1. Change the default notebook name (**Untitled Notebook *[date]***) to **RealTimeIngestion**.

1. In the first cell of the notebook, enter the following code, which uses *shell* commands to download data files from GitHub into the file system used by your cluster.

```python
%sh
rm -r /dbfs/device_stream
mkdir /dbfs/device_stream
wget -O /dbfs/device_stream/devices1.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices1.json
```

1. Use the **▸ Run Cell** menu option at the left of the cell to run it. Then wait for the Spark job run by the code to complete.

## Use delta tables for streaming data

Delta lake supports *streaming* data. Delta tables can be a *sink* or a *source* for data streams created using the Spark Structured Streaming API. In this example, you'll use a delta table as a sink for some streaming data in a simulated internet of things (IoT) scenario. The simulated device data is in JSON format, like this:

```json
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
```

1. In a new cell, run the following code to create a stream based on the folder containing the JSON device data:

```python
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads data from the folder, using a JSON schema
inputPath = '/device_stream/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
print("Source stream created...")
```

1. Add a new code cell and use it to perpetually write the stream of data to a delta folder:

```python
# Write the stream to a delta table
delta_stream_table_path = '/delta/iotdevicedata'
checkpointpath = '/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")
```

1. Add code to read the data, just like any other delta folder:

```python
# Read the data in delta format into a dataframe
df = spark.read.format("delta").load(delta_stream_table_path)
display(df)
```

1. Add the following code to create a table based on the delta folder to which the streaming data is being written:

```python
# create a catalog table based on the streaming sink
spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))
```

1. Use the following code to query the table:

```sql
%sql
SELECT * FROM IotDeviceData;
```

1. Run the following code to add some fresh device data to the stream:

```Bash
%sh
wget -O /dbfs/device_stream/devices2.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices2.json
```

1. Re-run the following SQL query code to verify that the new data has been added to the stream and written to the delta folder:

```sql
%sql
SELECT * FROM IotDeviceData;
```

1. Run the following code to stop the stream:

```python
deltastream.stop()
```

## Clean up

In Azure Databricks portal, on the **Compute** page, select your cluster and select **■ Terminate** to shut it down.

If you've finished exploring Azure Databricks, you can delete the resources you've created to avoid unnecessary Azure costs and free up capacity in your subscription.
Loading

0 comments on commit 3a08f2d

Please sign in to comment.