Skip to content

Commit

Permalink
Adding data generation pod to jupyter notebooks deployment (apache#14742
Browse files Browse the repository at this point in the history
)

Co-authored-by: Charles Smith <[email protected]>
Co-authored-by: Victoria Lim <[email protected]>
  • Loading branch information
3 people authored Aug 10, 2023
1 parent 82d82df commit 353f7be
Show file tree
Hide file tree
Showing 13 changed files with 886 additions and 1,341 deletions.
14 changes: 2 additions & 12 deletions examples/quickstart/jupyter-notebooks/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ RUN pip install requests \
pip install seaborn \
pip install bokeh \
pip install kafka-python \
pip install sortedcontainers
pip install sortedcontainers \
pip install tqdm

# Install druidapi client from apache/druid
# Local install requires sudo privileges
Expand All @@ -46,21 +47,10 @@ ADD druidapi /home/jovyan/druidapi
WORKDIR /home/jovyan/druidapi
RUN pip install .



# WIP -- install DruidDataDriver as a package
# Import data generator and configuration file
# Change permissions to allow import (requires sudo privileges)

# The Jupyter notebooks themselves are mounted into the image's /home/jovyan/notebooks
# path when running this image.
RUN mkdir -p /home/jovyan/notebooks

WORKDIR /home/jovyan/notebooks
USER jovyan



# Add location of the data generator to PYTHONPATH
ENV PYTHONPATH "${PYTHONPATH}:/home/jovyan/notebooks/02-ingestion"

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ volumes:
coordinator_var: {}
router_var: {}
druid_shared: {}
datagen_data: {}


services:
Expand Down Expand Up @@ -175,3 +176,12 @@ services:
- "${JUPYTER_PORT:-8889}:8888"
volumes:
- ../notebooks:/home/jovyan/notebooks

datagen:
image: imply/datagen:latest
container_name: datagen
profiles: ["jupyter", "kafka-jupyter", "druid-jupyter", "all-services"]
ports:
- "${DATAGEN_PORT:-9999}:9999"
volumes:
- datagen_data:/files
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ volumes:
coordinator_var: {}
router_var: {}
druid_shared: {}
datagen_data: {}


services:
Expand Down Expand Up @@ -173,3 +174,12 @@ services:
- "${JUPYTER_PORT:-8889}:8888"
volumes:
- ../notebooks:/home/jovyan/notebooks

datagen:
image: imply/datagen:latest
container_name: datagen
profiles: ["jupyter", "kafka-jupyter", "druid-jupyter", "all-services"]
ports:
- "${DATAGEN_PORT:-9999}:9999"
volumes:
- datagen_data:/files
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ druid_metadata_storage_connector_password=FoolishPassword

druid_coordinator_balancer_strategy=cachingCost

druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB
druid_indexer_runner_javaOptsArray=["-server", "-Xmx256m", "-Xms256m", "-XX:MaxDirectMemorySize=324m", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid_indexer_fork_property_druid_processing_buffer_sizeBytes=64MiB



Expand Down
34 changes: 34 additions & 0 deletions examples/quickstart/jupyter-notebooks/druidapi/druidapi/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

from druidapi import consts
import time

class DisplayClient:
'''
Expand Down Expand Up @@ -144,3 +145,36 @@ def schemas(self):

def tables(self, schema=consts.DRUID_SCHEMA):
self._druid.sql._tables_query(schema).show(display=self)

def run_task(self, query):
'''
Run an MSQ task while displaying progress in the cell output.
:param query: INSERT/REPLACE statement to run
:return: None
'''
from tqdm import tqdm

task = self._druid.sql.task(query)
with tqdm(total=100.0) as pbar:
previous_progress = 0.0
while True:
reports=task.reports_no_wait()
# check if progress metric is available and display it
if 'multiStageQuery' in reports.keys():
if 'payload' in reports['multiStageQuery'].keys():
if 'counters' in reports['multiStageQuery']['payload'].keys():
if ('0' in reports['multiStageQuery']['payload']['counters'].keys() ) and \
('0' in reports['multiStageQuery']['payload']['counters']['0'].keys()):
if 'progressDigest' in reports['multiStageQuery']['payload']['counters']['0']['0']['sortProgress'].keys():
current_progress = reports['multiStageQuery']['payload']['counters']['0']['0']['sortProgress']['progressDigest']*100.0
pbar.update( current_progress - previous_progress ) # update requires a relative value
previous_progress = current_progress
# present status if available
if 'status' in reports['multiStageQuery']['payload'].keys():
pbar.set_description(f"Loading data, status:[{reports['multiStageQuery']['payload']['status']['status']}]")
# stop when job is done
if reports['multiStageQuery']['payload']['status']['status'] in ['SUCCESS', 'FAILED']:
break;
else:
pbar.set_description('Initializing...')
time.sleep(1)
11 changes: 9 additions & 2 deletions examples/quickstart/jupyter-notebooks/druidapi/druidapi/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,9 @@ def reports(self) -> dict:
self._reports = self._tasks().task_reports(self._id)
return self._reports

def reports_no_wait(self) -> dict:
return self._tasks().task_reports(self._id, require_ok=False)

@property
def results(self):
if not self._results:
Expand Down Expand Up @@ -844,16 +847,20 @@ def function_parameters(self, table_name):
'''
return self._function_args_query(table_name).rows

def wait_until_ready(self, table_name):
def wait_until_ready(self, table_name, verify_load_status=True):
'''
Waits for a datasource to be loaded in the cluster, and to become available to SQL.
Parameters
----------
table_name str
The name of a datasource in the 'druid' schema.
verify_load_status
If true, checks whether all published segments are loaded before testing query.
If false, tries the test query before checking whether all published segments are loaded.
'''
self.druid_client.datasources.wait_until_ready(table_name)
if verify_load_status:
self.druid_client.datasources.wait_until_ready(table_name)
while True:
try:
self.sql('SELECT 1 FROM "{}" LIMIT 1'.format(table_name));
Expand Down
17 changes: 15 additions & 2 deletions examples/quickstart/jupyter-notebooks/druidapi/druidapi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

from druidapi.consts import OVERLORD_BASE
import requests

REQ_TASKS = OVERLORD_BASE + '/tasks'
REQ_POST_TASK = OVERLORD_BASE + '/task'
Expand Down Expand Up @@ -112,7 +113,7 @@ def task_status(self, task_id) -> dict:
'''
return self.client.get_json(REQ_TASK_STATUS, args=[task_id])

def task_reports(self, task_id) -> dict:
def task_reports(self, task_id, require_ok = True) -> dict:
'''
Retrieves the completion report for a completed task.
Expand All @@ -129,7 +130,19 @@ def task_reports(self, task_id) -> dict:
---------
`GET /druid/indexer/v1/task/{taskId}/reports`
'''
return self.client.get_json(REQ_TASK_REPORTS, args=[task_id])
if require_ok:
return self.client.get_json(REQ_TASK_REPORTS, args=[task_id])
else:
resp = self.client.get(REQ_TASK_REPORTS, args=[task_id], require_ok=require_ok)
if resp.status_code == requests.codes.ok:
try:
result = resp.json()
except Exception as ex:
result = {"message":"Payload could not be converted to json.", "payload":f"{resp.content}", "exception":f"{ex}"}
return result
else:
return {"message":f"Request return code:{resp.status_code}"}


def submit_task(self, payload):
'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@
" basics related to the Druid REST API and several endpoints.\n",
"- [Introduction to the Druid Python API](01-druidapi-package-intro.ipynb) walks you through some of the\n",
" basics related to the Druid API using the Python wrapper API.\n",
"- [Learn the basics of Druid SQL](../03-query/00-using-sql-with-druidapi.ipynb) introduces you to the unique aspects of Druid SQL with the primary focus on the SELECT statement. \n",
"- [Learn the basics of Druid SQL](../03-query/00-using-sql-with-druidapi.ipynb) introduces you to the unique aspects of Druid SQL with the primary focus on the SELECT statement.\n",
"- [Learn to use the Data Generator](./02-datagen-intro.ipynb) gets you started with streaming and batch file data generation for testing of any data schema.\n",
"- [Ingest and query data from Apache Kafka](../02-ingestion/01-streaming-from-kafka.ipynb) walks you through ingesting an event stream from Kafka."
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@
"metadata": {},
"outputs": [],
"source": [
"sql_client.run_task(sql)"
"display.run_task(sql)"
]
},
{
Expand Down Expand Up @@ -473,7 +473,7 @@
"id": "11d9c95a",
"metadata": {},
"source": [
"`describe_table()` lists the columns in a table."
"`display.table(<table_name>)` lists the columns in a table."
]
},
{
Expand Down
Loading

0 comments on commit 353f7be

Please sign in to comment.