generated from intersystems-community/iris-fhir-template
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSolutionModuleGenerator.cls
279 lines (241 loc) · 8.44 KB
/
SolutionModuleGenerator.cls
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
Class dc.jrpereira.fhirfy.core.SolutionModuleGenerator Extends dc.jrpereira.fhirfy.core.LLMAgent
{
Property llm As LLMService;
Method %OnNew() As %Status
{
Set ..llm = ##class(LLMService).GetLLMService()
Return $$$OK
}
Method GetResponse(dataInput As %DynamicObject) As %String [ Language = python ]
{
import xmltodict
import json
import iris
input = dataInput._Get("input")
xmlData = self.GetLLMResponse(input)
result = self.GenerateModule(xmlData)
return result
}
Method GetSystemPrompt(data) As %String [ Language = python ]
{
import iris
solutionSuggestion = data._Get("solutionSuggestion")._ToJSON()
rawDataOverview = data._Get("analysis")
clean = iris.cls("dc.jrpereira.fhirfy.core.Utils").CleanTripleQuotesString
return clean(f"""
# Problem statement
You have to implement a serie of Python modules following provided instructions in order to transform raw data into FHIR.
The instructions are formatted in JSON. The raw data overview is provided in markdown.
Implements all sub modules in the `subModules` key in the instructions JSON and the overview of the raw data.
Create a file called main.py which implements the pseudo code in the `pseudoCode` key in the instructions JSON.
Create a __init__.py file which imports all the required modules.
Use the Python library identiified in the PIP as "fhir.resources", to handle FHIR data.
Raw data overview:
```
{rawDataOverview}
```
Instructions JSON to your solution:
```json
{solutionSuggestion}
```
# Response expected format
Your answer must be a valid XML object which includes:
- the source code of the **whole** module into a XML CDATA block
- an object with information about each of the sub module functions, including:
- the function name
- the function description and purpose
- a list with the input parameters with information about them
- a description of the function return
- don't include any aditional formatting characters
Follow this template, replacing text between ## and ## with your answers:
<?xml version="1.0" encoding="UTF-8"?>
<module>
<name>## module name formmated in snake case ##</name>
<description>## modle high level description ##</description>
<dependencies>
<![CDATA[
## File requirements.txt for Python dependencies ##
]]>
</dependencies>
<files>
<!-- The Python package description -->
<file>
<name>__init__.py</name>
<description></description>
<source-code>
<![CDATA[
...
]]>
</source-code>
</file>
<!-- The main files description -->
<file>
<name>main.py</name>
<description></description>
<source-code>
<![CDATA[
...
]]>
</source-code>
</file>
<!-- Package sub modules -->
<file>
<name>sub-module-name.py</name>
<description></description>
<!-- sub module source code -->
<source-code>
<![CDATA[
...
]]>
</source-code>
<!-- sub module functions metadata -->
<functions>
<function>
<name></name>
<description></description>
<input>
<parameter>
<name></name>
<type></type>
<description></description>
</parameter>
<!-- Additional input parameters if needed -->
</input>
<output></output>
</function>
<!-- Additional functions if needed -->
</functions>
</file>
<!-- Additional sub-modules files if needed -->
</files>
</module>
""")
}
Method GenerateModule(xmlData As %String) As %String [ Language = python ]
{
import xmltodict
import json
import iris
xmlData = xmlData.replace("```xml", "")
xmlData = xmlData.replace("```", "")
#; xmlData = xmlData.replace(" ", "")
try:
xmlDict = xmltodict.parse(xmlData)
except Exception as e:
raise Exception("Failed to parse XML data: " + str(e) + "\n XML data: " + xmlData)
xmlDict = xmlDict["module"]
# avoid the structure ["files"]["file"]
xmlDict["files"] = xmlDict["files"]["file"]
result = json.dumps(xmlDict)
result = iris.cls("%Library.DynamicObject")._FromJSON(result)
self.CreateSolutionModuleWrapper(result)
return result
}
Method CreateSolutionModuleWrapper(moduleInfo As %String)
{
$$$ThrowOnError(..CreateSolutionModule(moduleInfo))
}
Method CreateSolutionModule(moduleInfo As %DynamicObject, baseDirectory As %String = "") As %Status
{
Set status = $$$OK
Try {
If baseDirectory = "" {
Set baseDirectory = ##class(%File).NormalizeDirectory("/home/irisowner/dev/python")
}
Set moduleDirectory = baseDirectory_"/"_moduleInfo.name
$$$ThrowOnError(..CreateModuleDirectory(moduleInfo, baseDirectory))
$$$ThrowOnError(..WriteRequirementsFile(moduleInfo, moduleDirectory))
$$$ThrowOnError(..WritePythonFiles(moduleInfo, moduleDirectory))
} Catch ex {
Set status = ex.AsStatus()
throw ex
}
Return status
}
Method CreateModuleDirectory(moduleInfo As %DynamicObject, baseDirectory As %String) As %Status
{
Set status = $$$OK
Try {
If ('$IsObject(moduleInfo)) || ($ZSTRIP(moduleInfo.name, "<>W") = "") {
Throw ##class(%Exception.General).%New("Module name is missing or invalid.")
}
Set moduleName = moduleInfo.name
Set moduleDirectory = ##class(%File).NormalizeDirectory(baseDirectory_"/"_moduleName)
Set isOK = ##class(%File).CreateDirectoryChain(moduleDirectory, .output)
If 'isOK {
Throw ##class(%Exception.General).%New("Unable to create module directory "_moduleDirectory_". Error: "_output)
}
} Catch ex {
Set status = ex.AsStatus()
}
Return status
}
Method WritePythonFiles(moduleMetadata As %DynamicObject, baseDirectory As %String) As %Status
{
Set status = $$$OK
Try {
If '$IsObject(moduleMetadata) {
Throw ##class(%Exception.General).%New("Module metadata is missing or invalid.")
}
Set it = moduleMetadata.files.%GetIterator()
While (it.%GetNext(.key, .file)){
If ('$IsObject(file)) || (file.name = "") || (file."source-code" = "") {
Throw ##class(%Exception.General).%New("File metadata is missing or invalid.")
}
Set pythonFilePath = baseDirectory_"/"_file.name
Set status = ..WriteFile(pythonFilePath, file."source-code")
If $$$ISERR(status) {
Throw ##class(%Exception.General).%New("Error writing Python file: "_$System.Status.GetErrorText(status))
}
}
} Catch ex {
Set status = ex.AsStatus()
}
Quit status
}
Method WriteRequirementsFile(moduleMetadata As %DynamicObject, baseDirectory As %String) As %Status
{
Set status = $$$OK
Try {
Set requirementsFilePath = baseDirectory_"/requirements.txt"
$$$ThrowOnError(..WriteFile(requirementsFilePath, moduleMetadata.dependencies))
} Catch ex {
Set status = ex.AsStatus()
}
Quit status
}
Method WriteFile(filePath As %String, fileContent As %String) As %Status
{
Set status = $$$OK
Try {
Set fileStream = ##class(%File).%New(filePath)
Set status = fileStream.Open("WN")
If $$$ISERR(status) {
Throw ##class(%Exception.General).%New("Error opening file: "_$System.Status.GetErrorText(status))
}
Set status = fileStream.WriteLine(fileContent)
If $$$ISERR(status) {
Throw ##class(%Exception.General).%New("Error writing to file: "_$System.Status.GetErrorText(status))
}
} Catch ex {
Set status = ex.AsStatus()
}
Quit status
}
Method CompressModule(moduleName) As %String [ Language = python ]
{
import os
import tempfile
import shutil
try:
directory_to_compress = f"/home/irisowner/dev/python/{moduleName}"
temp_dir = tempfile.mkdtemp()
compressed_file = os.path.join(temp_dir, f'{moduleName}.tar.gz')
shutil.make_archive(compressed_file[:-7], 'gztar', directory_to_compress)
except Exception as e:
raise Exception("Failed to compress module: " + str(e))
#; finally:
#; shutil.rmtree(temp_dir)
return compressed_file
}
}