Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MORAE] Multi-file support for linespan and code2amr slicing #652

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 86 additions & 79 deletions skema/rest/llm_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ class Dynamics(BaseModel):
"/linespan-given-filepaths-zip",
summary=(
"Send a zip file containing a code file,"
" get a line span of the dynamics back."
" get a line span of the dynamics back. One for each code file."
),
)
async def get_lines_of_model(zip_file: UploadFile = File()) -> Dynamics:
async def get_lines_of_model(zip_file: UploadFile = File()) -> List[Dynamics]:
"""
Endpoint for generating a line span containing the dynamics from a zip archive. Currently
it only expects there to be one python file in the zip. There can be other files, such as a
Expand All @@ -57,90 +57,97 @@ async def get_lines_of_model(zip_file: UploadFile = File()) -> Dynamics:
files=[]
blobs=[]
block=[]
outputs=[]
description=None
with ZipFile(BytesIO(zip_file.file.read()), "r") as zip:
for file in zip.namelist():
file_obj = Path(file)
if file_obj.suffix in [".py"]:
files.append(file)
blobs.append(zip.open(file).read())
blobs.append(zip.open(file).read().decode("utf-8"))

# iterate through each file
for f in len(files):
# read in the code, for the prompt
code = blobs[0].decode("utf-8") # needs to be regular string, not byte string
file = files[0]
# json for the fn construction
single_snippet_payload = {
"files": [file],
"blobs": [code],
}

# this is the formatting instructions
response_schemas = [
ResponseSchema(name="model_function", description="The name of the function that contains the model dynamics")
]

# for structured output parsing, converts schema to langhchain object
output_parser = StructuredOutputParser.from_response_schemas(response_schemas)

# for structured output parsing, makes the instructions to be passed as a variable to prompt template
format_instructions = output_parser.get_format_instructions()

# low temp as is not generative
temperature = 0.1

# initialize the models
openai = ChatOpenAI(
temperature=temperature,
model_name='gpt-3.5-turbo',
openai_api_key=SKEMA_OPENAI_KEY
)

# construct the prompts
template="You are a assistant that answers questions about code."
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
human_template="Find the function that contains the model dynamics in {code} \n{format_instructions}"
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)

# combining the templates for a chat template
chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

# formatting the prompt with input variables
formatted_prompt = chat_prompt.format_prompt(code=code, format_instructions = format_instructions).to_messages()

# running the model
output = openai(formatted_prompt)

# parsing the output
try:
parsed_output = output_parser.parse(output.content)

function_name = parsed_output['model_function']

# Get the FN from it
url = "https://api.askem.lum.ai/code2fn/fn-given-filepaths"
response_zip = requests.post(url, json=single_snippet_payload)

# get metadata entry for function
for entry in response_zip.json()['modules'][0]['fn_array']:
try:
if entry['b'][0]['name'][0:len(function_name)] == function_name:
metadata_idx = entry['b'][0]['metadata']
except:
None

# get line span using metadata
for (i,metadata) in enumerate(response_zip.json()['modules'][0]['metadata_collection']):
if i == (metadata_idx - 1):
line_begin = metadata[0]['line_begin']
line_end = metadata[0]['line_end']
except:
print("Failed to parse dynamics")
line_begin = 0
line_end = 0

block.append(f"L{line_begin}-L{line_end}")

output = Dynamics(name=None, description=None, block=block)
return output
code = blobs[f]
file = files[f]
# json for the fn construction
single_snippet_payload = {
"files": [file],
"blobs": [code],
}

# this is the formatting instructions
response_schemas = [
ResponseSchema(name="model_function", description="The name of the function that contains the model dynamics")
]

# for structured output parsing, converts schema to langhchain object
output_parser = StructuredOutputParser.from_response_schemas(response_schemas)

# for structured output parsing, makes the instructions to be passed as a variable to prompt template
format_instructions = output_parser.get_format_instructions()

# low temp as is not generative
temperature = 0.0

# initialize the models
openai = ChatOpenAI(
temperature=temperature,
model_name='gpt-3.5-turbo',
openai_api_key=SKEMA_OPENAI_KEY
)

# construct the prompts
template="You are a assistant that answers questions about code."
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
human_template="Find the function that contains the model dynamics in {code} \n{format_instructions}"
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)

# combining the templates for a chat template
chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

# formatting the prompt with input variables
formatted_prompt = chat_prompt.format_prompt(code=code, format_instructions = format_instructions).to_messages()

# running the model
output = openai(formatted_prompt)

# parsing the output
try:
parsed_output = output_parser.parse(output.content)

function_name = parsed_output['model_function']

# Get the FN from it
url = "https://api.askem.lum.ai/code2fn/fn-given-filepaths"
response_zip = requests.post(url, json=single_snippet_payload)

# get metadata entry for function
for entry in response_zip.json()['modules'][0]['fn_array']:
try:
if entry['b'][0]['name'][0:len(function_name)] == function_name:
metadata_idx = entry['b'][0]['metadata']
except:
continue

# get line span using metadata
for (i,metadata) in enumerate(response_zip.json()['modules'][0]['metadata_collection']):
if i == (metadata_idx - 1):
line_begin = metadata[0]['line_begin']
line_end = metadata[0]['line_end']
except:
print("Failed to parse dynamics")
description = "Failed to parse dynamics"
line_begin = 0
line_end = 0

block.append(f"L{line_begin}-L{line_end}")

output = Dynamics(name=file, description=description, block=block)
outputs.append(output)

return outputs


app = FastAPI()
Expand Down
67 changes: 46 additions & 21 deletions skema/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,31 +190,56 @@ async def llm_assisted_codebase_to_pn_amr(zip_file: UploadFile = File()):
"""
# NOTE: Opening the zip file mutates the object and prevents it from being reopened.
# Since llm_proxy also needs to open the zip file, we should send a copy instead.
linespan = await llm_proxy.get_lines_of_model(copy.deepcopy(zip_file))
lines = linespan.block[0].split("-")
line_begin = max(
int(lines[0][1:]) - 1, 0
) # Normalizing the 1-index response from llm_proxy
line_end = int(lines[1][1:])

# Currently the llm_proxy only works on the first file in a zip_archive.
# So we are required to do the same when slicing the source code using its output.
linespans = await llm_proxy.get_lines_of_model(copy.deepcopy(zip_file))

line_begin=[]
line_end=[]
files=[]
blobs=[]
amrs=[]
for linespan in linespans:
lines = linespan.block[0].split("-")
line_begin.append(max(
int(lines[0][1:]) - 1, 0
)) # Normalizing the 1-index response from llm_proxy
line_end.append(int(lines[1][1:]))

# Currently the llm_proxy only works on the first file in a zip_archive.
# So we are required to do the same when slicing the source code using its output.
with ZipFile(BytesIO(zip_file.file.read()), "r") as zip:
files = [zip.namelist()[0]]
blobs = [zip.open(files[0]).read().decode("utf-8")]
for file in zip.namelist():
file_obj = Path(file)
if file_obj.suffix in [".py"]:
files.append(file)
blobs.append(zip.open(file).read().decode("utf-8"))

# The source code is a string, so to slice using the line spans, we must first convert it to a list.
# Then we can convert it back to a string using .join
blobs[0] = "".join(blobs[0].splitlines(keepends=True)[line_begin:line_end])

amr = await code_snippets_to_pn_amr(
code2fn.System(
files=files,
blobs=blobs,
root_name=Path(zip_file.filename).stem,
system_name=Path(zip_file.filename).stem,
)
)
for i in len(blobs):
if line_begin[i] == line_end[i]:
print("failed linespan")
else:
blobs[i] = "".join(blobs[i].splitlines(keepends=True)[line_begin:line_end])
amrs.append(await code_snippets_to_pn_amr(
code2fn.System(
files=files,
blobs=blobs,
root_name=Path(zip_file.files[i]).stem,
system_name=Path(zip_file.files[i]).stem,
)
))
# we will return the amr with most states, in assumption it is the most "correct"
# by default it returns the first entry
amr = amrs[0]
for temp_amr in amrs:
try:
temp_len = len(temp_amr['model']['states'])
amr_len = len(amr['model']['states'])
if temp_len > amr_len:
amr = temp_amr
except:
continue

return amr


Expand Down
Loading