Skip to content

Commit

Permalink
Elasticsearch: replace native API in prod w/ REST
Browse files Browse the repository at this point in the history
Remove jest API which depends on Elasticsearch native API (also
removed), so no longer depend on lucene for this scope. Replace such
removed dependencies with the Elasticsearch low-level API, which does
not depend on Lucene. That now used API is a REST client with minimal
dependencies, indeed. -As opposed to Elastic's current high-level API,
which still depends on lucene and other Gerrit-constraining libraries.

Do so in order to decouple the elasticsearch client from lucene in
Gerrit. That REST client does not logically require Lucene anyway. Most
importantly, such decoupling now enables upcoming support for more than
just one Elasticsearch server version. This includes the new possibility
of bumping the latter to multiple yet later versions, such as 5 and 6.x.
The currently used version (2.4) should also still be kept, for a while.

Bumping such versions will likely require some Elasticsearch client code
adaptations, so that Gerrit can (dynamically?) switch between either
Elasticsearch server version to eventually support (hopefully soon).

Doing the same for the elasticsearch tests in Gerrit is to be done using
another change or more changes.

Add a partial and customized fork of [1], based on [1]'s commit [2], to
preserve the ability of building proper json requests for Elasticsearch.
Put that forked json-generating code under a new 'builders' sub-package.
There should be a possibility in some near future to consider removing
that fork, based on potential progress such as the one proposed in [3].
Meanwhile, this fork shall be maintained to usual Gerrit quality levels.

[1] https://github.com/elastic/elasticsearch
[2] tag: v2.4.4
[3] elastic/elasticsearch#30791

Bug: Issue 6094
Change-Id: I720c9885c9eab2388acc328eecb9eaa6940ced0c
  • Loading branch information
marco-miller authored and dpursehouse committed May 28, 2018
1 parent fb2e34d commit 6547ded
Show file tree
Hide file tree
Showing 22 changed files with 1,199 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package com.google.gerrit.elasticsearch;

import static com.google.gson.FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.codec.binary.Base64.decodeBase64;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.common.io.CharStreams;
import com.google.gerrit.elasticsearch.builders.SearchSourceBuilder;
import com.google.gerrit.elasticsearch.builders.XContentBuilder;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.FieldDef.FillArgs;
Expand All @@ -34,20 +37,39 @@
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gwtorm.protobuf.ProtobufCodec;
import io.searchbox.client.JestResult;
import io.searchbox.client.http.JestHttpClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.Delete;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import io.searchbox.indices.IndicesExists;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;

abstract class AbstractElasticIndex<K, V> implements Index<K, V> {

protected static final String BULK = "_bulk";
protected static final String DELETE = "delete";
protected static final String IGNORE_UNMAPPED = "ignore_unmapped";
protected static final String INDEX = "index";
protected static final String ORDER = "order";
protected static final String SEARCH = "_search";

protected static <T> List<T> decodeProtos(
JsonObject doc, String fieldName, ProtobufCodec<T> codec) {
JsonArray field = doc.getAsJsonArray(fieldName);
Expand All @@ -59,13 +81,25 @@ protected static <T> List<T> decodeProtos(
.toList();
}

static String getContent(Response response) throws IOException {
HttpEntity responseEntity = response.getEntity();
String content = "";
if (responseEntity != null) {
InputStream contentStream = responseEntity.getContent();
try (Reader reader = new InputStreamReader(contentStream)) {
content = CharStreams.toString(reader);
}
}
return content;
}

private final Schema<V> schema;
private final FillArgs fillArgs;
private final SitePaths sitePaths;
private final String indexNameRaw;
private final RestClient client;

protected final String indexName;
protected final JestHttpClient client;
protected final Gson gson;
protected final ElasticQueryBuilder queryBuilder;

Expand All @@ -74,7 +108,7 @@ protected static <T> List<T> decodeProtos(
FillArgs fillArgs,
SitePaths sitePaths,
Schema<V> schema,
JestClientBuilder clientBuilder,
ElasticRestClientBuilder clientBuilder,
String indexName) {
this.fillArgs = fillArgs;
this.sitePaths = sitePaths;
Expand All @@ -98,7 +132,11 @@ public Schema<V> getSchema() {

@Override
public void close() {
client.shutdownClient();
try {
client.close();
} catch (IOException e) {
// Ignored.
}
}

@Override
Expand All @@ -108,60 +146,57 @@ public void markReady(boolean ready) throws IOException {

@Override
public void delete(K c) throws IOException {
Bulk bulk = addActions(new Bulk.Builder(), c).refresh(true).build();
JestResult result = client.execute(bulk);
if (!result.isSucceeded()) {
String uri = getURI(indexNameRaw, BULK);
Response response = performRequest(HttpPost.METHOD_NAME, addActions(c), uri, getRefreshParam());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(
String.format(
"Failed to delete change %s in index %s: %s",
c, indexName, result.getErrorMessage()));
String.format("Failed to delete change %s in index %s: %s", c, indexName, statusCode));
}
}

@Override
public void deleteAll() throws IOException {
// Delete the index, if it exists.
JestResult result = client.execute(new IndicesExists.Builder(indexName).build());
if (result.isSucceeded()) {
result = client.execute(new DeleteIndex.Builder(indexName).build());
if (!result.isSucceeded()) {
Response response = client.performRequest(HttpHead.METHOD_NAME, indexName);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
response = client.performRequest(HttpDelete.METHOD_NAME, indexName);
statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(
String.format("Failed to delete index %s: %s", indexName, result.getErrorMessage()));
String.format("Failed to delete index %s: %s", indexName, statusCode));
}
}

// Recreate the index.
result = client.execute(new CreateIndex.Builder(indexName).settings(getMappings()).build());
if (!result.isSucceeded()) {
String error =
String.format("Failed to create index %s: %s", indexName, result.getErrorMessage());
response =
performRequest(HttpPut.METHOD_NAME, getMappings(), indexName, Collections.emptyMap());
statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
String error = String.format("Failed to create index %s: %s", indexName, statusCode);
throw new IOException(error);
}
}

protected abstract Bulk.Builder addActions(Bulk.Builder builder, K c);
protected abstract String addActions(K c);

protected abstract String getMappings();

protected abstract String getId(V v);

protected Delete delete(String type, K c) {
protected String delete(String type, K c) {
String id = c.toString();
return new Delete.Builder(id).index(indexName).type(type).build();
}

protected io.searchbox.core.Index insert(String type, V v) throws IOException {
String id = getId(v);
String doc = toDoc(v);
return new io.searchbox.core.Index.Builder(doc).index(indexName).type(type).id(id).build();
return toAction(type, id, DELETE);
}

private static boolean shouldAddElement(Object element) {
return !(element instanceof String) || !((String) element).isEmpty();
}

private String toDoc(V v) throws IOException {
try (XContentBuilder builder = jsonBuilder().startObject()) {
protected String toDoc(V v) throws IOException {
try (XContentBuilder closeable = new XContentBuilder()) {
XContentBuilder builder = closeable.startObject();
for (Values<V> values : schema.buildFields(v, fillArgs)) {
String name = values.getField().getName();
if (values.getField().isRepeatable()) {
Expand All @@ -177,7 +212,58 @@ private String toDoc(V v) throws IOException {
}
}
}
return builder.endObject().string();
return builder.endObject().string() + System.lineSeparator();
}
}

protected String toAction(String type, String id, String action) {
JsonObject properties = new JsonObject();
properties.addProperty("_id", id);
properties.addProperty("_index", indexName);
properties.addProperty("_type", type);

JsonObject jsonAction = new JsonObject();
jsonAction.add(action, properties);
return jsonAction.toString() + System.lineSeparator();
}

protected void addNamedElement(String name, JsonObject element, JsonArray array) {
JsonObject arrayElement = new JsonObject();
arrayElement.add(name, element);
array.add(arrayElement);
}

protected Map<String, String> getRefreshParam() {
Map<String, String> params = new HashMap<>();
params.put("refresh", "true");
return params;
}

protected String getSearch(SearchSourceBuilder searchSource, JsonArray sortArray) {
JsonObject search = new JsonParser().parse(searchSource.toString()).getAsJsonObject();
search.add("sort", sortArray);
return gson.toJson(search);
}

protected JsonArray getSortArray(String idFieldName) {
JsonObject properties = new JsonObject();
properties.addProperty(ORDER, "asc");
properties.addProperty(IGNORE_UNMAPPED, true);

JsonArray sortArray = new JsonArray();
addNamedElement(idFieldName, properties, sortArray);
return sortArray;
}

protected String getURI(String type, String request) throws UnsupportedEncodingException {
String encodedType = URLEncoder.encode(type, UTF_8.toString());
String encodedIndexName = URLEncoder.encode(indexName, UTF_8.toString());
return encodedIndexName + "/" + encodedType + "/" + request;
}

protected Response performRequest(
String method, String payload, String uri, Map<String, String> params) throws IOException {
HttpEntity entity = new NStringEntity(payload, ContentType.APPLICATION_JSON);
return client.performRequest(method, uri, params, entity);
}
}
Loading

0 comments on commit 6547ded

Please sign in to comment.