Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for download comparison #2

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 92 additions & 2 deletions llmkernel/kernel.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import time
import copy
import datetime
import io
import json
import logging
import os
import requests
import tempfile
import time
import traceback
import pandas as pd

from ipykernel.kernelbase import Kernel
from ipykernel.ipkernel import IPythonKernel
from toolsets.dataset_toolset import DatasetToolset
from archytas.react import ReActAgent

logger = logging.getLogger(__name__)

class PythonLLMKernel(IPythonKernel):
implementation = "askem-chatty-py"
implementation_version = "0.1"
Expand All @@ -29,6 +39,8 @@ def setup_instance(self, *args, **kwargs):
self.context = None
self.msg_types.append("context_setup_request")
self.msg_types.append("llm_request")
self.msg_types.append("download_dataset_request")
self.msg_types.append("save_dataset_request")
return super().setup_instance(*args, **kwargs)


Expand All @@ -48,7 +60,6 @@ def set_context(self, context, context_info):


def send_df_preview_message(self):
import pandas as pd
df = self.shell.ev("df")
if isinstance(df, pd.DataFrame):
split_df = json.loads(df.head(30).to_json(orient="split"))
Expand Down Expand Up @@ -122,9 +133,88 @@ async def context_setup_request(self, queue, message_id, message, **kwargs):
"execution_state": "idle",
},
channel="iopub"
)


async def download_dataset_request(self, queue, message_id, message, **kwargs):
content = message.get('content', {})
# TODO: Collect any options that might be needed, if they ever are

df = self.shell.ev("df")
if isinstance(df, pd.DataFrame):
output_buff = io.BytesIO()
df.to_csv(output_buff, index=False, header=True)
output_buff.seek(0)
self.send_response(self.iopub_socket, "download_response", {"data": output_buff})
else:
self.send_response(self.iopub_socket, "stream", {"name": "stderr", "text": "The dataframe is not able to be downloaded."})


async def save_dataset_request(self, queue, message_id, message, **kwargs):
self.send_response(
stream=self.iopub_socket,
msg_or_type="status",
content={
"execution_state": "busy",
},
channel="iopub"
)
df = self.shell.ev('df')
content = message.get('content', {})

parent_dataset_id = content.get("parent_dataset_id")
new_name = content.get("name")
filename = content.get("filename", None)


if filename is None:
filename = "dataset.csv"
parent_url = f"{os.environ['DATA_SERVICE_URL']}/datasets/{parent_dataset_id}"
parent_dataset = requests.get(parent_url).json()
if not parent_dataset:
raise Exception(f"Unable to locate parent dataset '{parent_dataset_id}'")

new_dataset = copy.deepcopy(parent_dataset)
del new_dataset["id"]
new_dataset["name"] = new_name
new_dataset["description"] += f"\nTransformed from dataset '{parent_dataset['name']}' ({parent_dataset['id']}) at {datetime.datetime.utcnow().strftime('%c %Z')}"
new_dataset["file_names"] = [filename]

create_req = requests.post(f"{os.environ['DATA_SERVICE_URL']}/datasets", json=new_dataset)
new_dataset_id = create_req.json()["id"]

new_dataset["id"] = new_dataset_id
new_dataset_url = f"{os.environ['DATA_SERVICE_URL']}/datasets/{new_dataset_id}"
data_url_req = requests.get(f'{new_dataset_url}/upload-url?filename={filename}')
data_url = data_url_req.json().get('url', None)

# Saving as a temporary file instead of a buffer to save memory
with tempfile.TemporaryFile() as temp_csv_file:
df.to_csv(temp_csv_file, index=False, header=True)
temp_csv_file.seek(0)
upload_response = requests.put(data_url, data=temp_csv_file)
if upload_response.status_code != 200:
raise Exception(f"Error uploading dataframe: {upload_response.content}")

self.send_response(
stream=self.iopub_socket,
msg_or_type="save_dataset_response",
content={
"dataset_id": new_dataset_id,
"filename": filename,
"parent_dataset_id": parent_dataset_id
},
channel="iopub",
)
self.send_response(
stream=self.iopub_socket,
msg_or_type="status",
content={
"execution_state": "idle",
},
channel="iopub"
)

async def do_execute(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False, *, cell_id=None):
result = await super().do_execute(code, silent, store_history, user_expressions, allow_stdin, cell_id=cell_id)
self.send_df_preview_message()
Expand Down
3 changes: 0 additions & 3 deletions toolsets/dataset_toolset.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ def reset(self):
self.dataset_id = None
self.df = None

def send_dataset(self):
pass

def context(self):
return f"""You are an analyst whose goal is to help with scientific data analysis and manipulation in Python.

Expand Down