Skip to content

Commit

Permalink
- fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
psmagin committed Aug 2, 2024
1 parent 99d760f commit e1b97d5
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 43 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
| KAFKA_SUBJECTS_CONSUMER_PATTERN | (${folio.environment}\.)(.*\.)search\.instance-subject | Custom subscription pattern for Kafka subject message consumers. |
| KAFKA_SUBJECTS_TOPIC_PARTITIONS | 50 | Amount of partitions for `search.instance-subject` topic. |
| KAFKA_SUBJECTS_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.instance-subject` topic. |
| KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY | 1 | Custom number of kafka concurrent threads for `search.reindex.range-index` message consuming. |
| KAFKA_REINDEX_RANGE_INDEX_TOPIC_PARTITIONS | 16 | Amount of partitions for `search.reindex.range-index` topic. |
| KAFKA_REINDEX_RANGE_INDEX_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.reindex.range-index` topic. |
| KAFKA_CONSUMER_MAX_POLL_RECORDS | 200 | Maximum number of records returned in a single call to poll(). |
| KAFKA_RETRY_INTERVAL_MS | 2000 | Specifies time to wait before reattempting query. |
| KAFKA_RETRY_DELIVERY_ATTEMPTS | 6 | Specifies how many queries attempt to perform after the first one failed. |
Expand Down Expand Up @@ -278,7 +281,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
| MAX_BROWSE_REQUEST_OFFSET | 500 | The maximum elasticsearch query offset for additional requests on browse around |
| SYSTEM_USER_ENABLED | true | Defines if system user must be created at service tenant initialization or used for egress service requests |
| REINDEX_LOCATION_BATCH_SIZE | 1_000 | Defines number of locations to retrieve per inventory http request on locations reindex process |
| MAX_SEARCH_BATCH_REQUEST_IDS_COUNT | 20_000 | Defines maximum batch request IDs count for searching consolidated items/holdings in consortium |
| MAX_SEARCH_BATCH_REQUEST_IDS_COUNT | 20_000 | Defines maximum batch request IDs count for searching consolidated items/holdings in consortium |

The module uses system user to communicate with other modules from Kafka consumers.
For production deployments you MUST specify the password for this system user via env variable:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,3 @@ public class ReindexRangeIndexEvent implements BaseKafkaMessage {
private String tenant;
private String ts;
}

//{
// "id": "8b00efbd-07e0-48c1-a691-d259d9c67e37",
// "entityType": "SUBJECT",
// "offset": 0,
// "limit": 10,
// "tenant": "test_tenant"
//}

Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ public boolean process(ReindexRangeIndexEvent event) {
var resourceEvents = rangeIndexService.fetchRecordRange(event);
var documents = documentConverter.convert(resourceEvents).values().stream().flatMap(Collection::stream).toList();
var folioIndexOperationResponse = elasticRepository.indexResources(documents);
rangeIndexService.updateFinishDate(event);
if (folioIndexOperationResponse.getStatus() == FolioIndexOperationResponse.StatusEnum.ERROR) {
// TODO MSEARCH-797 - update status as failed indicating upload has failed
throw new ReindexException(folioIndexOperationResponse.getErrorMessage());
}
rangeIndexService.updateFinishDate(event);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public ReindexEntityType entityType() {
@Override
protected RowMapper<Map<String, Object>> rowToMapMapper() {
return (rs, rowNum) -> {
Map<String, Object> subject = new HashMap<>();
subject.put("id", rs.getString("id"));
subject.put("number", rs.getString("number"));
subject.put("typeId", rs.getString("type_id"));
Map<String, Object> classification = new HashMap<>();
classification.put("id", rs.getString("id"));
classification.put("number", rs.getString("number"));
classification.put("typeId", rs.getString("type_id"));

var maps = jsonConverter.fromJsonToListOfMaps(rs.getString("instances"));
subject.put("instances", maps);
classification.put("instances", maps);

return subject;
return classification;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ public ReindexEntityType entityType() {
@Override
protected RowMapper<Map<String, Object>> rowToMapMapper() {
return (rs, rowNum) -> {
Map<String, Object> subject = new HashMap<>();
subject.put("id", rs.getString("id"));
subject.put("name", rs.getString("name"));
subject.put("contributorNameTypeId", rs.getString("contributor_name_type_id"));
subject.put("authorityId", rs.getString("authority_id"));
Map<String, Object> contributor = new HashMap<>();
contributor.put("id", rs.getString("id"));
contributor.put("name", rs.getString("name"));
contributor.put("contributorNameTypeId", rs.getString("contributor_name_type_id"));
contributor.put("authorityId", rs.getString("authority_id"));

var maps = jsonConverter.fromJsonToListOfMaps(rs.getString("instances"));
subject.put("instances", maps);
contributor.put("instances", maps);

return subject;
return contributor;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@
@Repository
public class InstanceJdbcRepository extends ReindexJdbcRepository {

private static final String SELECT_SQL = """
SELECT i.instance_json
|| jsonb_build_object('tenantId', i.tenant_id)
|| jsonb_build_object('shared', i.shared)
|| jsonb_build_object('isBoundWith', i.is_bound_with)
|| jsonb_build_object('holdings', jsonb_agg(h.holding_json || jsonb_build_object('tenantId', h.tenant_id)))
|| jsonb_build_object('items', jsonb_agg(it.item_json || jsonb_build_object('tenantId', it.tenant_id))) as json
FROM %s i
JOIN %s h on h.instance_id = i.id
JOIN %s it on it.holding_id = h.id
GROUP BY i.id LIMIT ? OFFSET ?;
""";

protected InstanceJdbcRepository(JdbcTemplate jdbcTemplate, JsonConverter jsonConverter,
FolioExecutionContext context,
ReindexConfigurationProperties reindexConfigurationProperties) {
Expand All @@ -27,21 +40,9 @@ public ReindexEntityType entityType() {

@Override
protected String getFetchBySql() {
return """
SELECT i.instance_json
|| jsonb_build_object('tenantId', i.tenant_id)
|| jsonb_build_object('shared', i.shared)
|| jsonb_build_object('isBoundWith', i.is_bound_with)
|| jsonb_build_object('holdings', jsonb_agg(h.holding_json || jsonb_build_object('tenantId', h.tenant_id)))
|| jsonb_build_object('items', jsonb_agg(it.item_json || jsonb_build_object('tenantId', it.tenant_id))) as json
FROM %s i
JOIN %s h on h.instance_id = i.id
JOIN %s it on it.holding_id = h.id
GROUP BY i.id LIMIT ? OFFSET ?;
"""
.formatted(getFullTableName(context, entityTable()),
getFullTableName(context, "holding"),
getFullTableName(context, "item"));
return SELECT_SQL.formatted(getFullTableName(context, entityTable()),
getFullTableName(context, "holding"),
getFullTableName(context, "item"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ ON CONFLICT (id)

private static final String SELECT_UPLOAD_RANGE_BY_ENTITY_TYPE_SQL = "SELECT * FROM %s WHERE entity_type = ?;";
private static final String COUNT_SQL = "SELECT COUNT(*) FROM %s;";
private static final String SELECT_RECORD_SQL = "SELECT * from %s LIMIT ? OFFSET ?;";

protected final JsonConverter jsonConverter;
protected final FolioExecutionContext context;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void setIndexRangeFinishDate(UUID id, Timestamp timestamp) {
}

protected String getFetchBySql() {
return "SELECT * from %s LIMIT ? OFFSET ?".formatted(getFullTableName(context, entityTable()));
return SELECT_RECORD_SQL.formatted(getFullTableName(context, entityTable()));
}

protected abstract RowMapper<Map<String, Object>> rowToMapMapper();
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ folio:
replicationFactor: ${KAFKA_CONSORTIUM_INSTANCE_TOPIC_REPLICATION_FACTOR:}
- name: search.reindex.range-index
numPartitions: ${KAFKA_REINDEX_RANGE_INDEX_TOPIC_PARTITIONS:16}
replicationFactor: ${KAFKA_CONSORTIUM_INSTANCE_TOPIC_REPLICATION_FACTOR:}
replicationFactor: ${KAFKA_REINDEX_RANGE_INDEX_TOPIC_REPLICATION_FACTOR:}
listener:
events:
concurrency: ${KAFKA_EVENTS_CONCURRENCY:2}
Expand Down Expand Up @@ -146,7 +146,7 @@ folio:
topic-pattern: (${folio.environment}\.)(.*\.)linked-data\.(work|authority)
group-id: ${folio.environment}-mod-search-linked-data-group
reindex-range-index:
concurrency: ${KAFKA_LINKED_DATA_CONCURRENCY:1}
concurrency: ${KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY:1}
topic-pattern: (${folio.environment}\.)(.*\.)search\.reindex\.range-index
group-id: ${folio.environment}-mod-search-reindex-range-index-group
okapiUrl: ${okapi.url}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@

<createTable tableName="upload_range">
<column name="id" type="UUID">
<constraints nullable="false" primaryKey="true" primaryKeyName="pk_upload_ranges"/>
<constraints nullable="false" primaryKey="true" primaryKeyName="pk_upload_range"/>
</column>
<column name="entity_type" type="VARCHAR(30)">
<constraints nullable="false"/>
Expand Down

0 comments on commit e1b97d5

Please sign in to comment.