forked from branchnetconsulting/wazuh-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
migrate_es_to_wi
189 lines (174 loc) · 8.35 KB
/
migrate_es_to_wi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/bin/bash
#
# migrate_es_to_wi
# by Kevin Branch
# Branch Network Consulting, LLC
#
# This tool assists with migrating stored indexes in Wazuh SIEMs from Elastic Stack 7.x over to Wazuh Indexer.
#
# Prerequisites:
#
# If your Elasticsearch ingest node pipeline(s) replace the [timestamp] field (normally just ingest time) with more accurate
# time source(s) coming from the body of qualifying events, like EventChannel or cloud provider log pulls,
# then temporarily disable this correction of [timestamp] so it is purely assigned ingest time, while migration is in progress.
# Otherwise migration may get tripped up by writing of records to yesterday's index (or older).
#
# Add something like this to /etc/wazuh-indexer/opensearch.yml and restart wazuh-indexer service, to accomodate reindexing.
# reindex.remote.whitelist: "127.0.0.1:9200"
# reindex.ssl.verification_mode: none
#
# The jq pacakge must be installed.
#
# Populate /etc/migrate_es_to_wi.conf with appropriate settings
#
# # Elasticsearch admin access details
# SRCUSER=elastic
# SRCPASS=abcd
# SRCHOST=1.2.3.4
# SRCPORT=9200
#
# # Wazuh Indexer admin access details
# DSTUSER=admin
# DSTPASS=wxyz
# DSTHOST=5.6.7.8
# DSTPORT=9200
#
# # Establish the pattern of source indexes to be migrated, also definining pattern of such indexes to exclude.
# # The first example below picks up all wazuh-a* source indices and removes from the list any that end with common date suffixes corresponding to today.
# # The second example below picks up only indices that end with common date suffixes corresponding to today.
# # Log shipping should be halted during migration of indices that would otherwise be getting actively written to.
# INCLUDES="wazuh-a*,-*`date +%Y.%m.%d`,-*`date +%Y.%m`,-*`date +%Y.%W`,-*`date +%Y.%W`w"
# #INCLUDES="wazuh-a*`date +%Y.%m.%d`,wazuh-a*`date +%Y.%m`,wazuh-a*`date +%Y.%W`,wazuh-a*`date +%Y.%W`w"
#
# # Set to yes to cause deletion of pre-existing target indexes if source and dest record counts do not match.
# # Set to no to skip over them.
# OVERWRITE_IF_BROKEN=yes
#
# # Set to yes to close the reindexed index on the Wazuh Indexer side upon completion of a verified successful reindex
# # This is to conserve Wazuh Indexer memory resources which may be tight while running temporarily in parallel to Elasticsearch
# CLOSE_TARGET_ON_SUCCESS=no
#
# # Set to yes to delete the original index upon it being successfully reindexed to the target side.
# # This is to conserve disk space, especially if the source and target indices use the same disk storage domain.
# DELETE_SOURCE_ON_SUCCESS=no
#
if [ ! -f /etc/migrate_es_to_wi.conf ]; then
echo "Please configure /etc/migrate_es_to_wi.conf first..."
exit
fi
. /etc/migrate_es_to_wi.conf
SRC_LIST_FILE=$(mktemp)
curl -s --insecure -u $SRCUSER:$SRCPASS "https://$SRCHOST:$SRCPORT/_cat/indices/$INCLUDES" > $SRC_LIST_FILE
if [[ `grep '"error"' $SRC_LIST_FILE` ]]; then
SRC_LIST=""
else
SRC_LIST=`awk '{print $3}' $SRC_LIST_FILE | sort`
fi
rm $SRC_LIST_FILE
DST_LIST_FILE=$(mktemp)
curl -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/_cat/indices/$INCLUDES" > $DST_LIST_FILE
if [[ `grep '"error"' $DST_LIST_FILE` ]]; then
DST_LIST=""
else
DST_LIST=`awk '{print $3}' $DST_LIST_FILE | sort`
fi
rm $DST_LIST_FILE
echo -e "Migration candidates in Elasticsearch: \n$SRC_LIST\n"
echo -e "Possibly already migrated to Wazuh Indexer: \n$DST_LIST"
for IDX in $SRC_LIST; do
echo -e "\nAssessing $IDX"
SRC_STATE=`curl -s --insecure -u $SRCUSER:$SRCPASS "https://$SRCHOST:$SRCPORT/_cat/indices/$IDX?h=status"`
echo "Source index is in state: $SRC_STATE"
if [[ "$SRC_STATE" == "close" ]]; then
echo "Opening source index..."
curl -X POST -s --insecure -u $SRCUSER:$SRCPASS "https://$SRCHOST:$SRCPORT/$IDX/_open" | jq .
fi
# Handle cases where the source index already exists on the target side.
if [[ `echo $DST_LIST | egrep "(^| )$IDX( |$)"` ]]; then
echo "Target index is already in Wazuh Indexer";
DST_STATE=`curl -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/_cat/indices/$IDX?h=status"`
echo "Target index is in state: $DST_STATE"
if [[ "$DST_STATE" == "close" ]]; then
echo "Opening target index..."
curl -X POST -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/$IDX/_open" | jq .
fi
SRC_COUNT=`curl -s --insecure -u $SRCUSER:$SRCPASS "https://$SRCHOST:$SRCPORT/$IDX/_count" | jq .count`
DST_COUNT=`curl -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/$IDX/_count" | jq .count`
if [[ "$SRC_COUNT" == "$DST_COUNT" ]]; then
echo "Source/target record counts match ($SRC_COUNT). This index is apparently already migrated. Skipping..."
if [[ "$SRC_STATE" == "close" ]]; then
echo "Closing source index since it was originally in a closed state..."
curl -X POST -s --insecure -u $SRCUSER:$SRCPASS "https://$SRCHOST:$SRCPORT/$IDX/_close" | jq .
fi
if [[ "$DST_STATE" == "close" ]]; then
echo "Closing target index since it was originally in a closed state..."
curl -X POST -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/$IDX/_close" | jq .
fi
continue
else
if [[ "$OVERWRITE_IF_BROKEN" == "no" ]]; then
echo "Source/target record counts do not match ($SRC_COUNT vs $DST_COUNT). Skipping migration because OVERWRITE_IF_BROKEN is not set."
if [[ "$DST_STATE" == "close" ]]; then
echo "Closing target index since it was originally in a closed state..."
curl -X POST -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/$IDX/_close" | jq .
fi
continue
else
echo "Source/target record counts do not match ($SRC_COUNT vs $DST_COUNT). Deleting target index because OVERWRITE_IF_BROKEN is set."
curl -X DELETE -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/$IDX"| jq .
fi
fi
fi
# Make source index readonly
echo "Switching source index to readonly state."
curl -X PUT -s --insecure -u $SRCUSER:$SRCPASS "https://$SRCHOST:$SRCPORT/$IDX/_block/write" | jq .
# Kick off the reindex
echo "Migrating $IDX from Elasticsearch to Wazuh Indexer"
curl -X POST -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/_reindex?wait_for_completion=true" -HContent-Type:application/json -d'{
"source": {
"remote": {
"host": "https://'$SRCHOST':'$SRCPORT'",
"username": "'$SRCUSER'",
"password": "'$SRCPASS'"
},
"index": "'$IDX'"
},
"dest": {
"index": "'$IDX'"
}
}' | jq .
# Fetch source and dest records count post reindex
echo "Verifying success of migration..."
curl -X POST -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/$IDX/_flush" | jq .
SRC_COUNT=`curl -s --insecure -u $SRCUSER:$SRCPASS "https://$SRCHOST:$SRCPORT/$IDX/_count" | jq .count`
DST_COUNT=`curl -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/$IDX/_count" | jq .count`
FAILCOUNT=0
while [[ $FAILCOUNT -lt 6 ]]; do
if [[ "$SRC_COUNT" == "$DST_COUNT" ]] ; then
break
else
FAILCOUNT=$(($FAILCOUNT+1))
sleep 10
SRC_COUNT=`curl -s --insecure -u $SRCUSER:$SRCPASS "https://$SRCHOST:$SRCPORT/$IDX/_count" | jq .count`
DST_COUNT=`curl -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/$IDX/_count" | jq .count`
fi
done
if [[ "$SRC_STATE" == "close" ]]; then
echo "Closing source index since it was originally in a closed state..."
curl -X POST -s --insecure -u $SRCUSER:$SRCPASS "https://$SRCHOST:$SRCPORT/$IDX/_close" | jq .
fi
# If counts match, declare success, conditionally delete from src, and conditionally close on target.
if [[ "$SRC_COUNT" == "$DST_COUNT" ]] && [[ "$SRC_COUNT" != "null" ]]; then
echo "Index migration successful! Record count ($SRC_COUNT) matches between source and target."
if [[ "$DELETE_SOURCE_ON_SUCCESS" == "yes" ]]; then
echo "Deleting index on Elasticsearch side because DELETE_SOURCE_ON_SUCCESS is set."
curl -X DELETE -s --insecure -u $SRCUSER:$SRCPASS "https://$SRCHOST:$SRCPORT/$IDX" | jq .
fi
if [[ "$CLOSE_TARGET_ON_SUCCESS" == "yes" ]]; then
echo "Closing index on Wazuh Indexer side because CLOSE_TARGET_ON_SUCCESS is set."
curl -X POST -s --insecure -u $DSTUSER:$DSTPASS "https://$DSTHOST:$DSTPORT/$IDX/_close" | jq .
fi
else
echo "Index migration failed! Source record count is $SRC_COUNT, but target record count is $DST_COUNT."
fi
done