Skip to content

Commit

Permalink
Merge pull request #880 from jertel/os2
Browse files Browse the repository at this point in the history
Support OpenSearch 2.x
  • Loading branch information
jertel authored Jun 7, 2022
2 parents cb4af7a + c3c2d6e commit 2dcc956
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 42 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
__pycache__/
*.pyc
virtualenv_run/
.venv
*.egg-info/
dist/
venv/
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Add the possibility to use rule and match fields in the description of TheHive alerts - [#855](https://github.com/jertel/elastalert2/pull/855) - @luffynextgen
- Fix missing colon on schema.yml and add unit test on it - [#866](https://github.com/jertel/elastalert2/pull/866) - @Isekai-Seikatsu
- Add the possibility to use tags, message and tlp level in TheHive observables [#873](https://github.com/jertel/elastalert2/pull/873) - @luffynextgen
- Support OpenSearch 2.x - [#880](https://github.com/jertel/elastalert2/pull/880) - @jertel

# 2.5.0

Expand Down
2 changes: 1 addition & 1 deletion docs/source/running_elastalert.rst
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ As a Python package
Requirements
------------

- Elasticsearch 7.x or 8.x
- Elasticsearch 7.x or 8.x, or OpenSearch 1.x or 2.x
- ISO8601 or Unix timestamped data
- Python 3.10. Require OpenSSL 1.1.1 or newer.
- pip
Expand Down
16 changes: 2 additions & 14 deletions elastalert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,8 @@ def es_version(self):
Returns the reported version from the Elasticsearch server.
"""
if self._es_version is None:
for retry in range(3):
try:
esinfo = self.info()['version']
if esinfo.get('distribution') == "opensearch":
# OpenSearch is based on Elasticsearch 7.10.2, currently only v1.0.0 exists
# https://opensearch.org/
self._es_version = "7.10.2"
else:
self._es_version = esinfo['number']
break
except TransportError:
if retry == 2:
raise
time.sleep(3)
self._es_version = util.get_version_from_cluster_info(self)

return self._es_version

def is_atleastseven(self):
Expand Down
9 changes: 2 additions & 7 deletions elastalert/create_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@
from envparse import Env

from elastalert.auth import Auth
from elastalert.util import get_version_from_cluster_info

env = Env(ES_USE_SSL=bool)


def create_index_mappings(es_client, ea_index, recreate=False, old_ea_index=None):
esinfo = es_client.info()['version']
if esinfo.get('distribution') == "opensearch":
# OpenSearch is based on Elasticsearch 7.10.2, currently only v1.0.0 exists
# https://opensearch.org/
esversion = "7.10.2"
else:
esversion = esinfo['number']
esversion = get_version_from_cluster_info(es_client)

es_index_mappings = {}
if is_atleasteight(esversion):
Expand Down
13 changes: 2 additions & 11 deletions elastalert/ruletypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ def get_all_terms(self, args):
# Iterate on each part of the composite key and add a sub aggs clause to the elastic search query
for i, sub_field in enumerate(field):
if self.rules.get('use_keyword_postfix', True):
level['values']['terms']['field'] = add_raw_postfix(sub_field, self.is_five_or_above())
level['values']['terms']['field'] = add_raw_postfix(sub_field, True)
else:
level['values']['terms']['field'] = sub_field
if i < len(field) - 1:
Expand All @@ -725,7 +725,7 @@ def get_all_terms(self, args):
self.seen_values.setdefault(field, [])
# For non-composite keys, only a single agg is needed
if self.rules.get('use_keyword_postfix', True):
field_name['field'] = add_raw_postfix(field, self.is_five_or_above())
field_name['field'] = add_raw_postfix(field, True)
else:
field_name['field'] = field

Expand Down Expand Up @@ -917,15 +917,6 @@ def add_terms_data(self, terms):
self.add_match(match)
self.seen_values[field].append(bucket['key'])

def is_five_or_above(self):
esinfo = self.es.info()['version']
if esinfo.get('distribution') == "opensearch":
# OpenSearch is based on Elasticsearch 7.10.2, currently only v1.0.0 exists
# https://opensearch.org/
return True
else:
return int(esinfo['number'][0]) >= 5


class CardinalityRule(RuleType):
""" A rule that matches if cardinality of a field is above or below a threshold within a timeframe """
Expand Down
25 changes: 25 additions & 0 deletions elastalert/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import os
import re
import sys
import time

import dateutil.parser
import pytz
from six import string_types

from elastalert import ElasticSearchClient
from elastalert.auth import Auth
from elasticsearch.exceptions import TransportError

logging.basicConfig()
logging.captureWarnings(True)
Expand Down Expand Up @@ -560,3 +562,26 @@ def parse_hosts(host, port=9200):
host_list = [format_host_port(x, port) for x in host_list]
return host_list


def get_version_from_cluster_info(client):
esversion = None
for retry in range(3):
try:
esinfo = client.info()['version']
esversion = esinfo['number']
if esinfo.get('distribution') == "opensearch":
# https://opensearch.org/
if esversion[0] == "1":
# OpenSearch 1.x is based on Elasticsearch 7.10.2
esversion = "7.10.2"
else:
# OpenSearch 2.x has qualities similar to 8.2.0
esversion = "8.2.0"
break
except TransportError:
if retry == 2:
raise
elastalert_logger.warning('Failed to retrieve cluster version information, retrying in 3 seconds')
time.sleep(3)

return esversion
14 changes: 14 additions & 0 deletions tests/elasticsearch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import dateutil
import pytest

from unittest import mock
from unittest.mock import MagicMock

import elastalert.create_index
import elastalert.elastalert
from elastalert import ElasticSearchClient
Expand All @@ -25,6 +28,17 @@ def es_client():
return ElasticSearchClient(es_conn_config)


def test_es_version(es_client):
mockInfo = {}
versionData = {}
versionData['number'] = "1.2.3"
mockInfo['version'] = versionData

with mock.patch('elasticsearch.client.Elasticsearch.info', new=MagicMock(return_value=mockInfo)):
version = es_client.es_version
assert version == "1.2.3"


@pytest.mark.elasticsearch
class TestElasticsearch(object):
# TODO perform teardown removing data inserted into Elasticsearch
Expand Down
13 changes: 4 additions & 9 deletions tests/rules_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,14 +556,12 @@ def test_change():
assert rule.matches == []


@pytest.mark.parametrize('version, expected_is_five_or_above', [
({'version': {'number': '2.x.x'}}, False),
({'version': {'number': '5.x.x'}}, True),
({'version': {'number': '6.x.x'}}, True),
@pytest.mark.parametrize('version', [
({'version': {'number': '7.x.x'}}, True),
({'version': {'number': '7.10.2', 'distribution': 'opensearch'}}, True),
({'version': {'number': '1.2.0', 'distribution': 'opensearch'}}, True),
({'version': {'number': '2.0.0', 'distribution': 'opensearch'}}, True),
])
def test_new_term(version, expected_is_five_or_above):
def test_new_term(version):
rules = {'fields': ['a', 'b'],
'timestamp_field': '@timestamp',
'es_host': 'example.com', 'es_port': 10, 'index': 'logstash',
Expand All @@ -585,8 +583,6 @@ def record_args(*args, **kwargs):
mock_es.return_value.search.side_effect = record_args
rule = NewTermsRule(rules)

assert rule.is_five_or_above() == expected_is_five_or_above

# 30 day default range, 1 day default step, times 2 fields
assert rule.es.search.call_count == 60

Expand Down Expand Up @@ -633,7 +629,6 @@ def record_args(*args, **kwargs):
rule.add_data([{'@timestamp': ts_now(), 'a': 'key2'}])
assert len(rule.matches) == 1
assert rule.matches[0]['missing_field'] == 'b'
assert rule.is_five_or_above() == expected_is_five_or_above


def test_new_term_nested_field():
Expand Down
26 changes: 26 additions & 0 deletions tests/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dateutil.tz import tzutc

from unittest import mock
from unittest.mock import MagicMock

from elastalert.util import add_raw_postfix
from elastalert.util import build_es_conn_config
Expand Down Expand Up @@ -38,6 +39,9 @@
from elastalert.util import format_string
from elastalert.util import pretty_ts
from elastalert.util import parse_hosts
from elastalert.util import get_version_from_cluster_info

from elasticsearch.client import Elasticsearch


@pytest.mark.parametrize('spec, expected_delta', [
Expand Down Expand Up @@ -604,3 +608,25 @@ def test_parse_host():
assert parse_hosts("host1, host2:9200, host3:9300") == ["host1:9200",
"host2:9200",
"host3:9300"]


@pytest.mark.parametrize('version, distro, expectedversion', [
('7.10.0', None, '7.10.0'),
('8.2.0', None, '8.2.0'),
('1.2.0', 'opensearch', '7.10.2'),
('2.0.0', 'opensearch', '8.2.0')
])
@mock.patch.dict(os.environ, {'AWS_DEFAULT_REGION': ''})
def test_get_version(version, distro, expectedversion):
mockInfo = {}
versionData = {}
versionData['number'] = version
if distro is not None:
versionData['distribution'] = distro

mockInfo['version'] = versionData

with mock.patch('elasticsearch.client.Elasticsearch.info', new=MagicMock(return_value=mockInfo)):
client = Elasticsearch()
actualversion = get_version_from_cluster_info(client)
assert expectedversion == actualversion

0 comments on commit 2dcc956

Please sign in to comment.