diff --git a/README.md b/README.md index 5257829a9..7c3dd8304 100644 --- a/README.md +++ b/README.md @@ -251,6 +251,9 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l | KAFKA_SUBJECTS_CONSUMER_PATTERN | (${folio.environment}\.)(.*\.)search\.instance-subject | Custom subscription pattern for Kafka subject message consumers. | | KAFKA_SUBJECTS_TOPIC_PARTITIONS | 50 | Amount of partitions for `search.instance-subject` topic. | | KAFKA_SUBJECTS_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.instance-subject` topic. | +| KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY | 1 | Custom number of kafka concurrent threads for `search.reindex.range-index` message consuming. | +| KAFKA_REINDEX_RANGE_INDEX_TOPIC_PARTITIONS | 16 | Amount of partitions for `search.reindex.range-index` topic. | +| KAFKA_REINDEX_RANGE_INDEX_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.reindex.range-index` topic. | | KAFKA_CONSUMER_MAX_POLL_RECORDS | 200 | Maximum number of records returned in a single call to poll(). | | KAFKA_RETRY_INTERVAL_MS | 2000 | Specifies time to wait before reattempting query. | | KAFKA_RETRY_DELIVERY_ATTEMPTS | 6 | Specifies how many queries attempt to perform after the first one failed. | @@ -278,7 +281,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l | MAX_BROWSE_REQUEST_OFFSET | 500 | The maximum elasticsearch query offset for additional requests on browse around | | SYSTEM_USER_ENABLED | true | Defines if system user must be created at service tenant initialization or used for egress service requests | | REINDEX_LOCATION_BATCH_SIZE | 1_000 | Defines number of locations to retrieve per inventory http request on locations reindex process | -| MAX_SEARCH_BATCH_REQUEST_IDS_COUNT | 20_000 | Defines maximum batch request IDs count for searching consolidated items/holdings in consortium | +| MAX_SEARCH_BATCH_REQUEST_IDS_COUNT | 20_000 | Defines maximum batch request IDs count for searching consolidated items/holdings in consortium | The module uses system user to communicate with other modules from Kafka consumers. For production deployments you MUST specify the password for this system user via env variable: diff --git a/src/main/java/org/folio/search/model/event/ReindexRangeIndexEvent.java b/src/main/java/org/folio/search/model/event/ReindexRangeIndexEvent.java index 587474399..631d4945d 100644 --- a/src/main/java/org/folio/search/model/event/ReindexRangeIndexEvent.java +++ b/src/main/java/org/folio/search/model/event/ReindexRangeIndexEvent.java @@ -16,12 +16,3 @@ public class ReindexRangeIndexEvent implements BaseKafkaMessage { private String tenant; private String ts; } - -//{ -// "id": "8b00efbd-07e0-48c1-a691-d259d9c67e37", -// "entityType": "SUBJECT", -// "offset": 0, -// "limit": 10, -// "tenant": "test_tenant" -//} - diff --git a/src/main/java/org/folio/search/service/reindex/ReindexOrchestrationService.java b/src/main/java/org/folio/search/service/reindex/ReindexOrchestrationService.java index a7099cc5a..959c295c1 100644 --- a/src/main/java/org/folio/search/service/reindex/ReindexOrchestrationService.java +++ b/src/main/java/org/folio/search/service/reindex/ReindexOrchestrationService.java @@ -21,10 +21,11 @@ public boolean process(ReindexRangeIndexEvent event) { var resourceEvents = rangeIndexService.fetchRecordRange(event); var documents = documentConverter.convert(resourceEvents).values().stream().flatMap(Collection::stream).toList(); var folioIndexOperationResponse = elasticRepository.indexResources(documents); + rangeIndexService.updateFinishDate(event); if (folioIndexOperationResponse.getStatus() == FolioIndexOperationResponse.StatusEnum.ERROR) { + // TODO MSEARCH-797 - update status as failed indicating upload has failed throw new ReindexException(folioIndexOperationResponse.getErrorMessage()); } - rangeIndexService.updateFinishDate(event); return true; } } diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/ClassificationJdbcRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/ClassificationJdbcRepository.java index d1f6e66c8..fdd8e0199 100644 --- a/src/main/java/org/folio/search/service/reindex/jdbc/ClassificationJdbcRepository.java +++ b/src/main/java/org/folio/search/service/reindex/jdbc/ClassificationJdbcRepository.java @@ -27,15 +27,15 @@ public ReindexEntityType entityType() { @Override protected RowMapper> rowToMapMapper() { return (rs, rowNum) -> { - Map subject = new HashMap<>(); - subject.put("id", rs.getString("id")); - subject.put("number", rs.getString("number")); - subject.put("typeId", rs.getString("type_id")); + Map classification = new HashMap<>(); + classification.put("id", rs.getString("id")); + classification.put("number", rs.getString("number")); + classification.put("typeId", rs.getString("type_id")); var maps = jsonConverter.fromJsonToListOfMaps(rs.getString("instances")); - subject.put("instances", maps); + classification.put("instances", maps); - return subject; + return classification; }; } diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/ContributorJdbcRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/ContributorJdbcRepository.java index 3ce82486e..8f8b661ff 100644 --- a/src/main/java/org/folio/search/service/reindex/jdbc/ContributorJdbcRepository.java +++ b/src/main/java/org/folio/search/service/reindex/jdbc/ContributorJdbcRepository.java @@ -27,16 +27,16 @@ public ReindexEntityType entityType() { @Override protected RowMapper> rowToMapMapper() { return (rs, rowNum) -> { - Map subject = new HashMap<>(); - subject.put("id", rs.getString("id")); - subject.put("name", rs.getString("name")); - subject.put("contributorNameTypeId", rs.getString("contributor_name_type_id")); - subject.put("authorityId", rs.getString("authority_id")); + Map contributor = new HashMap<>(); + contributor.put("id", rs.getString("id")); + contributor.put("name", rs.getString("name")); + contributor.put("contributorNameTypeId", rs.getString("contributor_name_type_id")); + contributor.put("authorityId", rs.getString("authority_id")); var maps = jsonConverter.fromJsonToListOfMaps(rs.getString("instances")); - subject.put("instances", maps); + contributor.put("instances", maps); - return subject; + return contributor; }; } diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/InstanceJdbcRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/InstanceJdbcRepository.java index 672e11077..0aca633c7 100644 --- a/src/main/java/org/folio/search/service/reindex/jdbc/InstanceJdbcRepository.java +++ b/src/main/java/org/folio/search/service/reindex/jdbc/InstanceJdbcRepository.java @@ -14,6 +14,19 @@ @Repository public class InstanceJdbcRepository extends ReindexJdbcRepository { + private static final String SELECT_SQL = """ + SELECT i.instance_json + || jsonb_build_object('tenantId', i.tenant_id) + || jsonb_build_object('shared', i.shared) + || jsonb_build_object('isBoundWith', i.is_bound_with) + || jsonb_build_object('holdings', jsonb_agg(h.holding_json || jsonb_build_object('tenantId', h.tenant_id))) + || jsonb_build_object('items', jsonb_agg(it.item_json || jsonb_build_object('tenantId', it.tenant_id))) as json + FROM %s i + JOIN %s h on h.instance_id = i.id + JOIN %s it on it.holding_id = h.id + GROUP BY i.id LIMIT ? OFFSET ?; + """; + protected InstanceJdbcRepository(JdbcTemplate jdbcTemplate, JsonConverter jsonConverter, FolioExecutionContext context, ReindexConfigurationProperties reindexConfigurationProperties) { @@ -27,21 +40,9 @@ public ReindexEntityType entityType() { @Override protected String getFetchBySql() { - return """ - SELECT i.instance_json - || jsonb_build_object('tenantId', i.tenant_id) - || jsonb_build_object('shared', i.shared) - || jsonb_build_object('isBoundWith', i.is_bound_with) - || jsonb_build_object('holdings', jsonb_agg(h.holding_json || jsonb_build_object('tenantId', h.tenant_id))) - || jsonb_build_object('items', jsonb_agg(it.item_json || jsonb_build_object('tenantId', it.tenant_id))) as json - FROM %s i - JOIN %s h on h.instance_id = i.id - JOIN %s it on it.holding_id = h.id - GROUP BY i.id LIMIT ? OFFSET ?; - """ - .formatted(getFullTableName(context, entityTable()), - getFullTableName(context, "holding"), - getFullTableName(context, "item")); + return SELECT_SQL.formatted(getFullTableName(context, entityTable()), + getFullTableName(context, "holding"), + getFullTableName(context, "item")); } @Override diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/ReindexJdbcRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/ReindexJdbcRepository.java index b24a9e694..8ffd48745 100644 --- a/src/main/java/org/folio/search/service/reindex/jdbc/ReindexJdbcRepository.java +++ b/src/main/java/org/folio/search/service/reindex/jdbc/ReindexJdbcRepository.java @@ -36,6 +36,7 @@ ON CONFLICT (id) private static final String SELECT_UPLOAD_RANGE_BY_ENTITY_TYPE_SQL = "SELECT * FROM %s WHERE entity_type = ?;"; private static final String COUNT_SQL = "SELECT COUNT(*) FROM %s;"; + private static final String SELECT_RECORD_SQL = "SELECT * from %s LIMIT ? OFFSET ?;"; protected final JsonConverter jsonConverter; protected final FolioExecutionContext context; @@ -94,7 +95,7 @@ public void setIndexRangeFinishDate(UUID id, Timestamp timestamp) { } protected String getFetchBySql() { - return "SELECT * from %s LIMIT ? OFFSET ?".formatted(getFullTableName(context, entityTable())); + return SELECT_RECORD_SQL.formatted(getFullTableName(context, entityTable())); } protected abstract RowMapper> rowToMapMapper(); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 084fcd2c5..bbb0d7254 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -111,7 +111,7 @@ folio: replicationFactor: ${KAFKA_CONSORTIUM_INSTANCE_TOPIC_REPLICATION_FACTOR:} - name: search.reindex.range-index numPartitions: ${KAFKA_REINDEX_RANGE_INDEX_TOPIC_PARTITIONS:16} - replicationFactor: ${KAFKA_CONSORTIUM_INSTANCE_TOPIC_REPLICATION_FACTOR:} + replicationFactor: ${KAFKA_REINDEX_RANGE_INDEX_TOPIC_REPLICATION_FACTOR:} listener: events: concurrency: ${KAFKA_EVENTS_CONCURRENCY:2} @@ -146,7 +146,7 @@ folio: topic-pattern: (${folio.environment}\.)(.*\.)linked-data\.(work|authority) group-id: ${folio.environment}-mod-search-linked-data-group reindex-range-index: - concurrency: ${KAFKA_LINKED_DATA_CONCURRENCY:1} + concurrency: ${KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY:1} topic-pattern: (${folio.environment}\.)(.*\.)search\.reindex\.range-index group-id: ${folio.environment}-mod-search-reindex-range-index-group okapiUrl: ${okapi.url} diff --git a/src/main/resources/changelog/changes/v4.0/create-reindex-entity-tables.xml b/src/main/resources/changelog/changes/v4.0/create-reindex-entity-tables.xml index cf6a80fa2..3c424cce6 100644 --- a/src/main/resources/changelog/changes/v4.0/create-reindex-entity-tables.xml +++ b/src/main/resources/changelog/changes/v4.0/create-reindex-entity-tables.xml @@ -211,7 +211,7 @@ - +