Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support opensearch #39

Merged
merged 17 commits into from
Jan 19, 2024
Merged
45 changes: 34 additions & 11 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,54 @@ jobs:
fail-fast: false
matrix:
elasticsearch:
- plainpicture/elasticsearch:2.4.1_delete-by-query
- elasticsearch:5.4
- docker.elastic.co/elasticsearch/elasticsearch:6.7.0
- docker.elastic.co/elasticsearch/elasticsearch:7.0.0
- docker.elastic.co/elasticsearch/elasticsearch:7.11.2
- docker.elastic.co/elasticsearch/elasticsearch:8.1.1
- image: plainpicture/elasticsearch:2.4.1_delete-by-query
env:
discovery.type: single-node
xpack.security.enabled: false
- image: elasticsearch:5.4
env:
discovery.type: single-node
xpack.security.enabled: false
- image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0
env:
discovery.type: single-node
xpack.security.enabled: false
- image: docker.elastic.co/elasticsearch/elasticsearch:7.0.0
env:
discovery.type: single-node
xpack.security.enabled: false
- image: docker.elastic.co/elasticsearch/elasticsearch:7.11.2
env:
discovery.type: single-node
xpack.security.enabled: false
- image: docker.elastic.co/elasticsearch/elasticsearch:8.1.1
env:
discovery.type: single-node
xpack.security.enabled: false
- image: opensearchproject/opensearch:1.3.14
env:
discovery.type: single-node
plugins.security.disabled: true
- image: opensearchproject/opensearch:2.11.1
env:
discovery.type: single-node
plugins.security.disabled: true
ruby:
- 2.7
- 3.0
- 3.1
- 3.2
services:
elasticsearch:
image: ${{ matrix.elasticsearch }}
env:
discovery.type: single-node
xpack.security.enabled: false
image: ${{ matrix.elasticsearch.image }}
env: ${{ matrix.elasticsearch.env }}
ports:
- 9200:9200
steps:
- uses: actions/checkout@v1
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
- run: gem install bundler
- run: bundle
- run: bundle exec rspec
- run: bundle exec rubocop
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@

# CHANGELOG

## v3.8.0.

* Support Opensearch 1.x and 2.x

## v3.7.2

* Fix wrong AWS signatures by generating the json before passing it to http-rb
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
Using SearchFlip it is dead-simple to create index classes that correspond to
[Elasticsearch](https://www.elastic.co/) indices and to manipulate, query and
aggregate these indices using a chainable, concise, yet powerful DSL. Finally,
SearchFlip supports Elasticsearch 2.x, 5.x, 6.x, 7.x and 8.x. Check section
[Feature Support](#feature-support) for version dependent features.
SearchFlip supports Elasticsearch 2.x, 5.x, 6.x, 7.x and 8.x as well as
Opensearch 1.x and 2.x. Check section [Feature Support](#feature-support) for
version dependent features.

```ruby
CommentIndex.search("hello world", default_field: "title").where(visible: true).aggregate(:user_id).sort(id: "desc")
Expand Down
2 changes: 1 addition & 1 deletion lib/search_flip/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def to_hash
res[:aggregations] = aggregation_values if aggregation_values

if must_values || must_not_values || filter_values
if target.connection.version.to_i >= 2
if target.connection.distribution || target.connection.version.to_i >= 2
res[:filter] = {
bool: {}
.merge(must_values ? { must: must_values } : {})
Expand Down
2 changes: 1 addition & 1 deletion lib/search_flip/bulk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def upload
return unless parsed_response["errors"]

parsed_response["items"].each do |item|
item.each do |_, element|
item.each_value do |element|
status = element["status"]

next if status.between?(200, 299)
Expand Down
38 changes: 24 additions & 14 deletions lib/search_flip/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,28 @@ def initialize(options = {})
@base_url = options[:base_url] || SearchFlip::Config[:base_url]
@http_client = options[:http_client] || SearchFlip::HTTPClient.new
@bulk_limit = options[:bulk_limit] || SearchFlip::Config[:bulk_limit]
@version_mutex = Mutex.new
end

# Queries and returns the Elasticsearch distribution used.
#
# @example
# connection.distribution # => e.g. "opensearch"
#
# @return [String] The Elasticsearch distribution

def distribution
@distribution ||= SearchFlip::JSON.parse(version_response.to_s)["version"]["distribution"]
end

# Queries and returns the Elasticsearch version used.
#
# @example
# connection.version # => e.g. 2.4.1
# connection.version # => e.g. "2.4.1"
#
# @return [String] The Elasticsearch version

def version
@version_mutex.synchronize do
@version ||= begin
response = http_client.headers(accept: "application/json").get("#{base_url}/")

SearchFlip::JSON.parse(response.to_s)["version"]["number"]
end
end
@version ||= SearchFlip::JSON.parse(version_response.to_s)["version"]["number"]
end

# Queries and returns the Elasticsearch cluster health.
Expand Down Expand Up @@ -64,7 +68,7 @@ def cluster_health
def msearch(criterias)
payload = criterias.flat_map do |criteria|
[
SearchFlip::JSON.generate(index: criteria.target.index_name_with_prefix, **(version.to_i < 8 ? { type: criteria.target.type_name } : {})),
SearchFlip::JSON.generate(index: criteria.target.index_name_with_prefix, **(distribution.nil? && version.to_i < 8 ? { type: criteria.target.type_name } : {})),
SearchFlip::JSON.generate(criteria.request)
]
end
Expand Down Expand Up @@ -300,8 +304,8 @@ def refresh(index_names = nil)
# @return [Boolean] Returns true or raises SearchFlip::ResponseError

def update_mapping(index_name, mapping, type_name: nil)
url = type_name && version.to_i < 8 ? type_url(index_name, type_name) : index_url(index_name)
params = type_name && version.to_f >= 6.7 && version.to_i < 8 ? { include_type_name: true } : {}
url = type_name && distribution.nil? && version.to_i < 8 ? type_url(index_name, type_name) : index_url(index_name)
params = type_name && distribution.nil? && version.to_f >= 6.7 && version.to_i < 8 ? { include_type_name: true } : {}

http_client.put("#{url}/_mapping", params: params, json: mapping)

Expand All @@ -318,8 +322,8 @@ def update_mapping(index_name, mapping, type_name: nil)
# @return [Hash] The current type mapping

def get_mapping(index_name, type_name: nil)
url = type_name && version.to_i < 8 ? type_url(index_name, type_name) : index_url(index_name)
params = type_name && version.to_f >= 6.7 && version.to_i < 8 ? { include_type_name: true } : {}
url = type_name && distribution.nil? && version.to_i < 8 ? type_url(index_name, type_name) : index_url(index_name)
params = type_name && distribution.nil? && version.to_f >= 6.7 && version.to_i < 8 ? { include_type_name: true } : {}

response = http_client.headers(accept: "application/json").get("#{url}/_mapping", params: params)

Expand Down Expand Up @@ -422,5 +426,11 @@ def type_url(index_name, type_name)
def index_url(index_name)
"#{base_url}/#{index_name}"
end

private

def version_response
@version_response ||= http_client.headers(accept: "application/json").get("#{base_url}/")
end
end
end
8 changes: 4 additions & 4 deletions lib/search_flip/criteria.rb
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ def delete(params = {})
http_request = connection.http_client
http_request = http_request.timeout(http_timeout_value) if http_timeout_value

if connection.version.to_i >= 5
url = connection.version.to_i < 8 ? target.type_url : target.index_url
if connection.distribution || connection.version.to_i >= 5
url = connection.distribution.nil? && connection.version.to_i < 8 ? target.type_url : target.index_url

http_request.post("#{url}/_delete_by_query", params: request_params.merge(params), json: dupped_request)
else
Expand Down Expand Up @@ -622,15 +622,15 @@ def execute!
json: { scroll: scroll_args[:timeout], scroll_id: scroll_args[:id] }
)
elsif scroll_args
url = connection.version.to_i < 8 ? target.type_url : target.index_url
url = connection.distribution.nil? && connection.version.to_i < 8 ? target.type_url : target.index_url

http_request.post(
"#{url}/_search",
params: request_params.merge(scroll: scroll_args[:timeout]),
json: request
)
else
url = connection.version.to_i < 8 ? target.type_url : target.index_url
url = connection.distribution.nil? && connection.version.to_i < 8 ? target.type_url : target.index_url

http_request.post("#{url}/_search", params: request_params, json: request)
end
Expand Down
8 changes: 4 additions & 4 deletions lib/search_flip/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def get_mapping
# equal to _doc.

def include_type_name?
type_name != "_doc" || connection.version.to_i < 7
type_name != "_doc" || (connection.distribution.nil? && connection.version.to_i < 7)
end

# Retrieves the document specified by id from Elasticsearch. Raises
Expand All @@ -455,7 +455,7 @@ def include_type_name?
# @return [Hash] The specified document

def get(id, params = {})
url = connection.version.to_i < 8 ? type_url : "#{index_url}/_doc"
url = connection.distribution.nil? && connection.version.to_i < 8 ? type_url : "#{index_url}/_doc"
response = connection.http_client.headers(accept: "application/json").get("#{url}/#{id}", params: params)

SearchFlip::JSON.parse(response.to_s)
Expand All @@ -474,7 +474,7 @@ def get(id, params = {})
# @return [Hash] The raw response

def mget(request, params = {})
url = connection.version.to_i < 8 ? type_url : index_url
url = connection.distribution.nil? && connection.version.to_i < 8 ? type_url : index_url
response = connection.http_client.headers(accept: "application/json").post("#{url}/_mget", json: request, params: params)

SearchFlip::JSON.parse(response.to_s)
Expand Down Expand Up @@ -633,7 +633,7 @@ def bulk(options = {})
bulk_max_mb: connection.bulk_max_mb
}

url = connection.version.to_i < 8 ? type_url : index_url
url = connection.distribution.nil? && connection.version.to_i < 8 ? type_url : index_url

SearchFlip::Bulk.new("#{url}/_bulk", default_options.merge(options)) do |indexer|
yield indexer
Expand Down
2 changes: 1 addition & 1 deletion lib/search_flip/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module SearchFlip
VERSION = "3.7.2"
VERSION = "3.8.0"
end
2 changes: 1 addition & 1 deletion spec/search_flip/bulk_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

it "uses the specified http_client" do
product = create(:product)
url = ProductIndex.connection.version.to_i < 8 ? ProductIndex.type_url : ProductIndex.index_url
url = ProductIndex.connection.distribution.nil? && ProductIndex.connection.version.to_i < 8 ? ProductIndex.type_url : ProductIndex.index_url

stub_request(:put, "#{url}/_bulk").with(headers: { "X-Header" => "Value" }).to_return(status: 200, body: "{}")

Expand Down
38 changes: 22 additions & 16 deletions spec/search_flip/connection_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
require File.expand_path("../spec_helper", __dir__)

RSpec.describe SearchFlip::Connection do
describe "#distribution" do
it "reutrns the distribution" do
expect([nil, "opensearch"]).to include(SearchFlip::Connection.new.distribution)
end
end

describe "#version" do
it "returns the version" do
expect(SearchFlip::Connection.new.version).to match(/\A[0-9.]+\z/)
Expand Down Expand Up @@ -92,7 +98,7 @@
it "returns the specified indices" do
connection = SearchFlip::Connection.new

expect(connection.get_indices.to_set { |index| index["index"] }).to eq(["comments", "products"].to_set)
expect(connection.get_indices.map { |index| index["index"] }.grep_v(/^\./).to_set).to eq(["comments", "products"].to_set)
expect(connection.get_indices("com*").map { |index| index["index"] }).to eq(["comments"])
end

Expand Down Expand Up @@ -170,7 +176,7 @@
it "freezes the specified index" do
connection = SearchFlip::Connection.new

if connection.version.to_f >= 6.6 && connection.version.to_i < 8
if connection.distribution.nil? && connection.version.to_f >= 6.6 && connection.version.to_i < 8
begin
connection.create_index("index_name")
connection.freeze_index("index_name")
Expand All @@ -187,7 +193,7 @@
it "unfreezes the specified index" do
connection = SearchFlip::Connection.new

if connection.version.to_f >= 6.6 && connection.version.to_i < 8
if connection.distribution.nil? && connection.version.to_f >= 6.6 && connection.version.to_i < 8
begin
connection.create_index("index_name")
connection.freeze_index("index_name")
Expand Down Expand Up @@ -241,7 +247,7 @@
end

describe "#update_mapping" do
if SearchFlip::Connection.new.version.to_i >= 7
if SearchFlip::Connection.new.then { |connection| connection.distribution || connection.version.to_i >= 7 }
it "updates the mapping of an index without type name" do
begin
connection = SearchFlip::Connection.new
Expand All @@ -266,7 +272,7 @@

connection.create_index("index_name")

if connection.version.to_i < 8
if connection.distribution.nil? && connection.version.to_i < 8
connection.update_mapping("index_name", { "type_name" => mapping }, type_name: "type_name")

expect(connection.get_mapping("index_name", type_name: "type_name")).to eq("index_name" => { "mappings" => { "type_name" => mapping } })
Expand Down Expand Up @@ -321,9 +327,9 @@

bulk = proc do
connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: CommentIndex.index_name, ** connection.version.to_i < 8 ? { _type: CommentIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: CommentIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: CommentIndex.type_name } : {}
end
end

Expand All @@ -346,23 +352,23 @@
connection = SearchFlip::Connection.new

connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end

bulk = proc do
connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end
end

expect(&bulk).to raise_error(SearchFlip::Bulk::Error)

bulk = proc do
connection.bulk ignore_errors: [409] do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end
end

Expand All @@ -375,7 +381,7 @@
connection = SearchFlip::Connection.new

connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end

expect(SearchFlip::Bulk).to have_received(:new).with(
Expand All @@ -398,7 +404,7 @@
}

connection.bulk(options) do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end

expect(SearchFlip::Bulk).to have_received(:new).with(anything, options)
Expand Down
Loading