forked from google/fhir-data-pipes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline_validation.sh
executable file
·274 lines (241 loc) · 10.3 KB
/
pipeline_validation.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
#!/usr/bin/env bash
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Example usage:
# ./pipeline_validation.sh ./ JDBC_OPENMRS
# ./pipeline_validation.sh ./ NON_JDBC --use_docker_network
# ./pipeline_validation.sh ./ STREAMING --use_docker_network
set -e
#################################################
# Prints the usage
#################################################
function usage() {
echo "This script validates if number of resources sunk in parquet files and"
echo "FHIR Server match what is stored in the OpenMRS server"
echo
echo " usage: ./pipeline_validation.sh HOME_DIR PARQUET_SUBDIR [OPTIONS] "
echo " HOME_DIR Path where e2e-tests directory is. Directory MUST"
echo " contain the parquet tools jar as well as subdirectory"
echo " of parquet file output"
echo " PARQUET_SUBDIR Subdirectory name under HOME_DIR containing"
echo " parquet files "
echo
echo " Options: "
echo " --use_docker_network Flag to specify whether to use docker"
echo " or host network URLs"
echo " --streaming Flag to specify whether we are testing a"
echo " streaming pipeline"
echo " --hapi Flag to specify whether we are testing a"
echo " batch pipeline executed with jdbc direct"
echo " fetch mode with HAPI as the source server"
}
#################################################
# Makes sure args passed are correct
#################################################
function validate_args() {
if [[ $# -lt 2 || $# -gt 5 ]]; then
echo "Invalid number of args passed."
usage
exit 1
fi
echo "Checking if the Parquet-tools JAR exists..."
if compgen -G "${1}/parquet-tools*.jar" > /dev/null; then
echo "Parquet-tools JAR exists in ${1}"
else
echo "Parquet-tools JAR not found in ${1}"
usage
exit 1
fi
if [[ ! -d ${1}/${2} ]]; then
echo "The directory ${1}/${2} does not exist."
usage
exit 1
fi
}
#################################################
# Function that prints messages
# Arguments:
# anything that needs printing
#################################################
function print_message() {
local print_prefix="E2E TEST:"
echo "${print_prefix} $*"
}
#################################################
# Function that defines the global vars
# Globals:
# HOME_PATH
# PARQUET_SUBDIR
# FHIR_SERVER_URL
# SINK_SERVER
# STREAMING
# HAPI
# Arguments:
# Path where e2e-tests directory is. Directory contains parquet tools jar as
# well as subdirectory of parquet file output
# Subdirectory name under HOME_DIR containing parquet files.
# Example: NON_JDBC or JDBC_OPENMRS
# Optional: Flag to specify whether to use docker or host network URLs.
# Optional: Flag to specify streaming pipeline test.
# Optional: Flag to specify whether Jdbc mode is executed with a HAPI source server.
#################################################
function setup() {
HOME_PATH=$1
PARQUET_SUBDIR=$2
rm -rf "${HOME_PATH}/fhir"
rm -rf "${HOME_PATH}/${PARQUET_SUBDIR}/*.json"
find "${HOME_PATH}/${PARQUET_SUBDIR}" -size 0 -delete
FHIR_SERVER_URL='http://localhost:8099/openmrs/ws/fhir2/R4'
SINK_SERVER='http://localhost:8098'
STREAMING=""
HAPI=""
# TODO: We should refactor this code to parse the arguments by going through
# each one and checking which ones are turned on.
if [[ $3 = "--hapi" ]] || [[ $4 = "--hapi" ]] || [[ $5 = "--hapi" ]]; then
HAPI="on"
FHIR_SERVER_URL='http://localhost:8091'
fi
if [[ $3 = "--use_docker_network" ]] || [[ $4 = "--use_docker_network" ]] || [[ $5 = "--use_docker_network" ]]; then
if [[ -n ${HAPI} ]]; then
FHIR_SERVER_URL='http://hapi-server:8080'
else
FHIR_SERVER_URL='http://openmrs:8080/openmrs/ws/fhir2/R4'
fi
SINK_SERVER='http://sink-server:8080'
fi
if [[ $3 = "--streaming" ]] || [[ $4 = "--streaming" ]] || [[ $5 = "--streaming" ]]; then
STREAMING="on"
fi
}
#################################################
# Function to count resources in fhir server
# Globals:
# HOME_PATH
# PARQUET_SUBDIR
# FHIR_SERVER_URL
# TOTAL_TEST_PATIENTS
# TOTAL_TEST_ENCOUNTERS
# TOTAL_TEST_OBS
# STREAMING
# HAPI
#################################################
function fhir_source_query() {
local patient_query_param="?_summary=count"
local enc_obs_query_param="?_summary=count"
local fhir_username="admin"
local fhir_password="Admin123"
local fhir_url_extension=""
if [[ -n ${STREAMING} ]]; then
patient_query_param="?given=Alberta625"
enc_obs_query_param="?subject.given=Alberta625"
fi
if [[ -n ${HAPI} ]]; then
fhir_username="hapi"
fhir_password="hapi"
fhir_url_extension="/fhir"
fi
curl -L -X GET -u $fhir_username:$fhir_password --connect-timeout 5 --max-time 20 \
"${FHIR_SERVER_URL}${fhir_url_extension}/Patient${patient_query_param}" 2>/dev/null >>"${HOME_PATH}/${PARQUET_SUBDIR}/patients.json"
TOTAL_TEST_PATIENTS=$(jq '.total' "${HOME_PATH}/${PARQUET_SUBDIR}/patients.json")
print_message "Total FHIR source test patients ---> ${TOTAL_TEST_PATIENTS}"
curl -L -X GET -u $fhir_username:$fhir_password --connect-timeout 5 --max-time 20 \
"${FHIR_SERVER_URL}${fhir_url_extension}/Encounter${enc_obs_query_param}" \
2>/dev/null >>"${HOME_PATH}/${PARQUET_SUBDIR}/encounters.json"
TOTAL_TEST_ENCOUNTERS=$(jq '.total' "${HOME_PATH}/${PARQUET_SUBDIR}/encounters.json")
print_message "Total FHIR source test encounters ---> ${TOTAL_TEST_ENCOUNTERS}"
curl -L -X GET -u $fhir_username:$fhir_password --connect-timeout 5 --max-time 20 \
"${FHIR_SERVER_URL}${fhir_url_extension}/Observation${enc_obs_query_param}" \
2>/dev/null >>"${HOME_PATH}/${PARQUET_SUBDIR}/obs.json"
TOTAL_TEST_OBS=$(jq '.total' "${HOME_PATH}/${PARQUET_SUBDIR}/obs.json")
print_message "Total FHIR source test obs ---> ${TOTAL_TEST_OBS}"
}
#################################################
# Function that counts resources in parquet files and compares output to what
# is in openmrs server
# Globals:
# HOME_PATH
# PARQUET_SUBDIR
# TOTAL_TEST_PATIENTS
# TOTAL_TEST_ENCOUNTERS
# TOTAL_TEST_OBS
# HAPI
#################################################
function test_parquet_sink() {
print_message "Counting number of patients, encounters and obs sinked to parquet files"
total_patients_streamed=$(java -jar ./parquet-tools-1.11.1.jar rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Patient/" | awk '{print $3}')
print_message "Total patients synced to parquet ---> ${total_patients_streamed}"
total_encounters_streamed=$(java -jar ./parquet-tools-1.11.1.jar rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Encounter/" | awk '{print $3}')
print_message "Total encounters synced to parquet ---> ${total_encounters_streamed}"
total_obs_streamed=$(java -jar ./parquet-tools-1.11.1.jar rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Observation/" | awk '{print $3}')
print_message "Total obs synced to parquet ---> ${total_obs_streamed}"
if [[ "${total_patients_streamed}" == "${TOTAL_TEST_PATIENTS}" && "${total_encounters_streamed}" \
== "${TOTAL_TEST_ENCOUNTERS}" && "${total_obs_streamed}" == "${TOTAL_TEST_OBS}" ]] \
; then
print_message "PARQUET SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
else
print_message "PARQUET SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
exit 1
fi
}
#################################################
# Function that counts resources in FHIR server and compares output to what is
# in openmrs server
# Globals:
# HOME_PATH
# PARQUET_SUBDIR
# SINK_SERVER
# TOTAL_TEST_PATIENTS
# TOTAL_TEST_ENCOUNTERS
# TOTAL_TEST_OBS
# STREAMING
# HAPI
#################################################
function test_fhir_sink() {
local patient_query_param="?_summary=count"
local enc_obs_query_param="?_summary=count"
if [[ -n ${STREAMING} ]]; then
patient_query_param="?given=Alberta625"
enc_obs_query_param="?subject.given=Alberta625"
fi
print_message "Finding number of patients, encounters and obs in FHIR server"
mkdir "${HOME_PATH}/fhir"
curl -L -X GET -u hapi:hapi --connect-timeout 5 --max-time 20 \
"${SINK_SERVER}/fhir/Patient${patient_query_param}" 2>/dev/null >>"${HOME_PATH}/fhir/patients.json"
curl -L -X GET -u hapi:hapi --connect-timeout 5 --max-time 20 \
"${SINK_SERVER}/fhir/Encounter${enc_obs_query_param}" 2>/dev/null >>"${HOME_PATH}/fhir/encounters.json"
curl -L -X GET -u hapi:hapi --connect-timeout 5 --max-time 20 \
"${SINK_SERVER}/fhir/Observation${enc_obs_query_param}" 2>/dev/null >>"${HOME_PATH}/fhir/obs.json"
print_message "Counting number of patients, encounters and obs sinked to fhir files"
total_patients_sinked_fhir=$(jq '.total' "${HOME_PATH}/fhir/patients.json")
print_message "Total patients sinked to fhir ---> ${total_patients_sinked_fhir}"
total_encounters_sinked_fhir=$(jq '.total' "${HOME_PATH}/fhir/encounters.json")
print_message "Total encounters sinked to fhir ---> ${total_encounters_sinked_fhir}"
total_obs_sinked_fhir=$(jq '.total' "${HOME_PATH}/fhir/obs.json")
print_message "Total observations sinked to fhir ---> ${total_obs_sinked_fhir}"
if [[ "${total_patients_sinked_fhir}" == "${TOTAL_TEST_PATIENTS}" && "${total_encounters_sinked_fhir}" \
== "${TOTAL_TEST_ENCOUNTERS}" && "${total_obs_sinked_fhir}" == "${TOTAL_TEST_OBS}" ]] \
; then
print_message "FHIR SERVER SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
else
print_message "FHIR SERVER SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
exit 1
fi
}
validate_args "$@"
setup "$@"
print_message "---- STARTING ${PARQUET_SUBDIR} TEST ----"
fhir_source_query
test_parquet_sink
test_fhir_sink
print_message "END!!"