Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow builder #29

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions doc/source/standard_scripts/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@ to:
from port.platforms.instagram import process

Available platforms
-------------------
===================

.. automodule:: port.platforms.chatgpt
ChatGPT
-------

.. automodule:: port.platforms.chatgpt

Instagram
---------

.. automodule:: port.platforms.instagram

TikTok
---------

.. automodule:: port.platforms.tiktok
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def epoch_to_iso(epoch_timestamp: str | int | float) -> str:
epoch_timestamp = int(float(epoch_timestamp))
out = datetime.fromtimestamp(epoch_timestamp, tz=timezone.utc).isoformat()
except (OverflowError, OSError, ValueError, TypeError) as e:
logger.error("Could not convert epoch time timestamp, %s", e)
logger.debug("Could not convert epoch time timestamp, %s", e)

return out

Expand Down
98 changes: 35 additions & 63 deletions src/framework/processing/py/port/platforms/chatgpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
ChatGPT

This module contains an example flow of a ChatGPT data donation study

To see what type of DDPs from ChatGPT it is designed for check DDP_CATEGORIES
"""
import logging

import pandas as pd

import port.api.props as props
import port.helpers.extraction_helpers as eh
import port.helpers.port_helpers as ph
import port.helpers.validate as validate

from port.platforms.flow_builder import DataDonationFlow
from port.helpers.validate import (
DDPCategory,
DDPFiletype,
Expand Down Expand Up @@ -75,7 +75,7 @@ def conversations_to_df(chatgpt_zip: str) -> pd.DataFrame:



def extraction(chatgpt_zip: str) -> list[props.PropsUIPromptConsentFormTable]:
def extraction_fun(chatgpt_zip: str) -> list[props.PropsUIPromptConsentFormTable]:
tables_to_render = []

df = conversations_to_df(chatgpt_zip)
Expand Down Expand Up @@ -104,68 +104,40 @@ def extraction(chatgpt_zip: str) -> list[props.PropsUIPromptConsentFormTable]:



# TEXTS
SUBMIT_FILE_HEADER = props.Translatable({
"en": "Select your ChatGPT file",
"nl": "Selecteer uw ChatGPT bestand"
})

REVIEW_DATA_HEADER = props.Translatable({
"en": "Your ChatGPT data",
"nl": "Uw ChatGPT gegevens"
})
TEXTS = {
"submit_file_header": props.Translatable({
"en": "Select your ChatGPT file",
"nl": "Selecteer uw ChatGPT bestand"
}),
"review_data_header": props.Translatable({
"en": "Your ChatGPT data",
"nl": "Uw ChatGPT gegevens"
}),
"retry_header": props.Translatable({
"en": "Try again",
"nl": "Probeer opnieuw"
}),
"review_data_description": props.Translatable({
"en": "Below you will find a currated selection of ChatGPT data. In this case only the conversations you had with ChatGPT are show on screen. The data represented in this way are much more insightfull because you can actually read back the conversations you had with ChatGPT",
"nl": "Below you will find a currated selection of ChatGPT data. In this case only the conversations you had with ChatGPT are show on screen. The data represented in this way are much more insightfull because you can actually read back the conversations you had with ChatGPT",
}),
}

RETRY_HEADER = props.Translatable({
"en": "Try again",
"nl": "Probeer opnieuw"
})
FUNCTIONS = {
"extraction": extraction_fun
}

REVIEW_DATA_DESCRIPTION = props.Translatable({
"en": "Below you will find a currated selection of ChatGPT data. In this case only the conversations you had with ChatGPT are show on screen. The data represented in this way are much more insightfull because you can actually read back the conversations you had with ChatGPT",
"nl": "Below you will find a currated selection of ChatGPT data. In this case only the conversations you had with ChatGPT are show on screen. The data represented in this way are much more insightfull because you can actually read back the conversations you had with ChatGPT",
})


def process(session_id: int):
platform_name = "ChatGPT"

table_list = None
while True:
logger.info("Prompt for file for %s", platform_name)

file_prompt = ph.generate_file_prompt("application/zip")
file_result = yield ph.render_page(SUBMIT_FILE_HEADER, file_prompt)

if file_result.__type__ == "PayloadString":
validation = validate.validate_zip(DDP_CATEGORIES, file_result.value)

# Happy flow: Valid DDP
if validation.get_status_code_id() == 0:
logger.info("Payload for %s", platform_name)
extraction_result = extraction(file_result.value)
table_list = extraction_result
break

# Enter retry flow, reason: if DDP was not a ChatGPT DDP
if validation.get_status_code_id() != 0:
logger.info("Not a valid %s zip; No payload; prompt retry_confirmation", platform_name)
retry_prompt = ph.generate_retry_prompt(platform_name)
retry_result = yield ph.render_page(RETRY_HEADER, retry_prompt)

if retry_result.__type__ == "PayloadTrue":
continue
else:
logger.info("Skipped during retry flow")
break

else:
logger.info("Skipped at file selection ending flow")
break

if table_list is not None:
logger.info("Prompt consent; %s", platform_name)
review_data_prompt = ph.generate_review_data_prompt(f"{session_id}-chatgpt", REVIEW_DATA_DESCRIPTION, table_list)
yield ph.render_page(REVIEW_DATA_HEADER, review_data_prompt)

yield ph.exit(0, "Success")
yield ph.render_end_page()
flow = DataDonationFlow(
platform_name="ChatGPT",
ddp_categories=DDP_CATEGORIES,
texts=TEXTS,
functions=FUNCTIONS,
session_id=session_id,
is_donate_logs=False,
)

yield from flow.initialize_default_flow().run()
145 changes: 145 additions & 0 deletions src/framework/processing/py/port/platforms/flow_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""
Flow Builder

This module contains tools to create data donation flows
"""

import logging
import json
import io

import port.helpers.port_helpers as ph
import port.helpers.validate as validate

logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s --- %(name)s --- %(levelname)s --- %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S%z",
)


def should_yield(func):
func.is_yieldable = True
return func


def is_yieldable(func):
return getattr(func, 'is_yieldable', False)


class DataDonationFlow:
def __init__(self, platform_name, ddp_categories, texts, functions, session_id, is_donate_logs):
self.name = platform_name
self.ddp_categories = ddp_categories
self.texts = texts
self.functions = functions
self.session_id = session_id
self.is_donate_logs = is_donate_logs
self.log_stream = io.StringIO()
self.steps = []
self._configure_logger()

def _configure_logger(self):
if self.is_donate_logs:
handler_stream = self.log_stream
logger.handlers = [] # clear handler
handler = logging.StreamHandler(handler_stream)
handler.setLevel(logging.INFO)
handler.setFormatter(
logging.Formatter(
fmt="%(asctime)s --- %(name)s --- %(levelname)s --- %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S%z"
)
)
logger.addHandler(handler)

def donate_logs(self):
log_string = self.log_stream.getvalue()
if log_string:
log_data = log_string.split("\n")
else:
log_data = ["no logs"]

return ph.donate(f"{self.session_id}-tracking.json", json.dumps(log_data))

def add_step(self, step_function):
self.steps.append(step_function)
return self

def initialize_default_flow(self):
self.add_step(prompt_file_and_validate_input)
self.add_step(extract_data)
self.add_step(review_data)
self.add_step(exit_flow)
return self

def run(self):
logger.info("Starting data donation flow for %s", self.name)
print(self.name)
if self.is_donate_logs:
yield self.donate_logs()

data = None
for step in self.steps:
if is_yieldable(step):
data = yield from step(self, data)
else:
data = step(self, data)

if self.is_donate_logs:
yield self.donate_logs()


@should_yield
def prompt_file_and_validate_input(flow, _):
logger.info("Prompt for file step for %s", flow.name)
ddp_zip = None

while True:
file_prompt = ph.generate_file_prompt("application/zip")
file_result = yield ph.render_page(flow.texts["submit_file_header"], file_prompt)

validation = validate.validate_zip(flow.ddp_categories, file_result.value)

# Happy flow: Valid DDP
if validation.get_status_code_id() == 0:
logger.info("Validation of DDP was succesfull for %s", flow.name)
ddp_zip = file_result.value

break

# Enter retry flow
if validation.get_status_code_id() != 0:
logger.info("DDP did not pass validation; prompt retry_confirmation", flow.name)
retry_prompt = ph.generate_retry_prompt(flow.name)
retry_result = yield ph.render_page(flow.texts["retry_header"], retry_prompt)

if retry_result.__type__ == "PayloadTrue":
continue
else:
logger.info("Skipped during retry flow")
break

return ddp_zip


def extract_data(flow, zip):
table_list = flow.functions["extraction"](zip)
return table_list


@should_yield
def review_data(flow, table_list):
if table_list != None:
logger.info("Ask participant to review data; %s", flow.name)
review_data_prompt = ph.generate_review_data_prompt(f"{flow.session_id}-chatgpt", flow.texts["review_data_description"], table_list)
yield ph.render_page(flow.texts["review_data_header"], review_data_prompt)
else:
logger.info("No data got extracted %s", flow.name)


@should_yield
def exit_flow(_, __):
yield ph.exit(0, "Success")
yield ph.render_end_page()
Loading
Loading