diff --git a/llmkernel/kernel.py b/llmkernel/kernel.py index 951e3a1..d856d86 100644 --- a/llmkernel/kernel.py +++ b/llmkernel/kernel.py @@ -1,12 +1,22 @@ -import time +import copy +import datetime +import io import json +import logging +import os +import requests +import tempfile +import time import traceback +import pandas as pd from ipykernel.kernelbase import Kernel from ipykernel.ipkernel import IPythonKernel from toolsets.dataset_toolset import DatasetToolset from archytas.react import ReActAgent +logger = logging.getLogger(__name__) + class PythonLLMKernel(IPythonKernel): implementation = "askem-chatty-py" implementation_version = "0.1" @@ -29,6 +39,8 @@ def setup_instance(self, *args, **kwargs): self.context = None self.msg_types.append("context_setup_request") self.msg_types.append("llm_request") + self.msg_types.append("download_dataset_request") + self.msg_types.append("save_dataset_request") return super().setup_instance(*args, **kwargs) @@ -48,7 +60,6 @@ def set_context(self, context, context_info): def send_df_preview_message(self): - import pandas as pd df = self.shell.ev("df") if isinstance(df, pd.DataFrame): split_df = json.loads(df.head(30).to_json(orient="split")) @@ -122,9 +133,88 @@ async def context_setup_request(self, queue, message_id, message, **kwargs): "execution_state": "idle", }, channel="iopub" + ) + + + async def download_dataset_request(self, queue, message_id, message, **kwargs): + content = message.get('content', {}) + # TODO: Collect any options that might be needed, if they ever are + + df = self.shell.ev("df") + if isinstance(df, pd.DataFrame): + output_buff = io.BytesIO() + df.to_csv(output_buff, index=False, header=True) + output_buff.seek(0) + self.send_response(self.iopub_socket, "download_response", {"data": output_buff}) + else: + self.send_response(self.iopub_socket, "stream", {"name": "stderr", "text": "The dataframe is not able to be downloaded."}) + + async def save_dataset_request(self, queue, message_id, message, **kwargs): + self.send_response( + stream=self.iopub_socket, + msg_or_type="status", + content={ + "execution_state": "busy", + }, + channel="iopub" ) + df = self.shell.ev('df') + content = message.get('content', {}) + + parent_dataset_id = content.get("parent_dataset_id") + new_name = content.get("name") + filename = content.get("filename", None) + + + if filename is None: + filename = "dataset.csv" + parent_url = f"{os.environ['DATA_SERVICE_URL']}/datasets/{parent_dataset_id}" + parent_dataset = requests.get(parent_url).json() + if not parent_dataset: + raise Exception(f"Unable to locate parent dataset '{parent_dataset_id}'") + + new_dataset = copy.deepcopy(parent_dataset) + del new_dataset["id"] + new_dataset["name"] = new_name + new_dataset["description"] += f"\nTransformed from dataset '{parent_dataset['name']}' ({parent_dataset['id']}) at {datetime.datetime.utcnow().strftime('%c %Z')}" + new_dataset["file_names"] = [filename] + + create_req = requests.post(f"{os.environ['DATA_SERVICE_URL']}/datasets", json=new_dataset) + new_dataset_id = create_req.json()["id"] + + new_dataset["id"] = new_dataset_id + new_dataset_url = f"{os.environ['DATA_SERVICE_URL']}/datasets/{new_dataset_id}" + data_url_req = requests.get(f'{new_dataset_url}/upload-url?filename={filename}') + data_url = data_url_req.json().get('url', None) + + # Saving as a temporary file instead of a buffer to save memory + with tempfile.TemporaryFile() as temp_csv_file: + df.to_csv(temp_csv_file, index=False, header=True) + temp_csv_file.seek(0) + upload_response = requests.put(data_url, data=temp_csv_file) + if upload_response.status_code != 200: + raise Exception(f"Error uploading dataframe: {upload_response.content}") + self.send_response( + stream=self.iopub_socket, + msg_or_type="save_dataset_response", + content={ + "dataset_id": new_dataset_id, + "filename": filename, + "parent_dataset_id": parent_dataset_id + }, + channel="iopub", + ) + self.send_response( + stream=self.iopub_socket, + msg_or_type="status", + content={ + "execution_state": "idle", + }, + channel="iopub" + ) + async def do_execute(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False, *, cell_id=None): result = await super().do_execute(code, silent, store_history, user_expressions, allow_stdin, cell_id=cell_id) self.send_df_preview_message() diff --git a/toolsets/dataset_toolset.py b/toolsets/dataset_toolset.py index 8388029..075ddda 100644 --- a/toolsets/dataset_toolset.py +++ b/toolsets/dataset_toolset.py @@ -48,9 +48,6 @@ def reset(self): self.dataset_id = None self.df = None - def send_dataset(self): - pass - def context(self): return f"""You are an analyst whose goal is to help with scientific data analysis and manipulation in Python.