Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STNG-8 Use concurrent versions of HashMap, TreeMap, HashSet #193

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,34 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.StreamSupport;
import org.dcsa.conformance.core.state.StatefulEntity;

public class ActionPromptsQueue implements StatefulEntity {
private final Set<String> allActionIds = new HashSet<>();
private final Set<String> allActionIds = ConcurrentHashMap.newKeySet();
private final LinkedList<JsonNode> pendingActions = new LinkedList<>();

synchronized void clear() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let's discuss this before proceeding with the change, because "synchronized" was in place because most operations affect both collections in parallel.

void clear() {
allActionIds.clear();
pendingActions.clear();
}

synchronized void addLast(JsonNode actionPrompt) {
void addLast(JsonNode actionPrompt) {
String actionId = actionPrompt.get("actionId").asText();
if (!allActionIds.contains(actionId)) {
allActionIds.add(actionId);
pendingActions.add(actionPrompt);
}
}

synchronized boolean isEmpty() {
boolean isEmpty() {
return pendingActions.isEmpty();
}

synchronized JsonNode removeFirst() {
JsonNode removeFirst() {
if (pendingActions.isEmpty()) {
return null;
}
Expand All @@ -42,7 +42,7 @@ synchronized JsonNode removeFirst() {
}

@Override
public synchronized JsonNode exportJsonState() {
public JsonNode exportJsonState() {
ObjectNode stateNode = OBJECT_MAPPER.createObjectNode();

ArrayNode allActionIdsNode = stateNode.putArray("allActionIds");
Expand All @@ -56,7 +56,7 @@ public synchronized JsonNode exportJsonState() {
}

@Override
public synchronized void importJsonState(JsonNode jsonState) {
public void importJsonState(JsonNode jsonState) {
StreamSupport.stream(jsonState.get("allActionIds").spliterator(), false)
.map(JsonNode::asText)
.forEach(allActionIds::add);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.dcsa.conformance.core.state;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MemorySortedPartitionsLockingMap extends SortedPartitionsLockingMap {
Expand All @@ -16,22 +16,21 @@ private static class MemoryMapItem {
JsonNode value;
}

private final HashMap<String, TreeMap<String, MemoryMapItem>> memoryMap = new HashMap<>();
private final Map<String, Map<String, MemoryMapItem>> memoryMap = new ConcurrentHashMap<>();

public MemorySortedPartitionsLockingMap() {
super(60L * 1000L, 100L, 60L * 1000L);
}

private MemoryMapItem _getOrCreateItem(String partitionKey, String sortKey) {
private synchronized MemoryMapItem getOrCreateItem(String partitionKey, String sortKey) {
return memoryMap
.computeIfAbsent(partitionKey, ignoredKey -> new TreeMap<>())
.computeIfAbsent(partitionKey, ignoredKey -> new ConcurrentSkipListMap<>())
.computeIfAbsent(sortKey, ignoredKey -> new MemoryMapItem());
}

@Override
protected void _saveItem(String lockedBy, String partitionKey, String sortKey, JsonNode value) {
synchronized (memoryMap) {
MemoryMapItem item = _getOrCreateItem(partitionKey, sortKey);
MemoryMapItem item = getOrCreateItem(partitionKey, sortKey);
if (Objects.equals(lockedBy, item.lockedBy)) {
if (item.lockedUntil > System.currentTimeMillis()) {
item.value = value;
Expand All @@ -42,39 +41,34 @@ protected void _saveItem(String lockedBy, String partitionKey, String sortKey, J
} else {
throw new RuntimeException("%s cannot save: item is locked by %s".formatted(lockedBy, item.lockedBy));
}
}
}

@Override
protected JsonNode _loadItem(String lockedBy, String partitionKey, String sortKey)
throws TemporaryLockingMapException {
synchronized (memoryMap) {
MemoryMapItem item = _getOrCreateItem(partitionKey, sortKey);
MemoryMapItem item = getOrCreateItem(partitionKey, sortKey);
long currentTime = System.currentTimeMillis();
if (item.lockedBy != null && item.lockedUntil > currentTime) {
log.debug("%s cannot load: must wait for %s to save".formatted(lockedBy, item.lockedBy));
log.debug("{} cannot load: must wait for {} to save", lockedBy, item.lockedBy);
throw new TemporaryLockingMapException(null);
} else {
item.lockedBy = lockedBy;
item.lockedUntil = currentTime + lockDurationMillis;
}
return item.value;
}
}

@Override
protected void _unlockItem(String lockedBy, String partitionKey, String sortKey) {
synchronized (memoryMap) {
MemoryMapItem item = _getOrCreateItem(partitionKey, sortKey);
MemoryMapItem item = getOrCreateItem(partitionKey, sortKey);
if (Objects.equals(lockedBy, item.lockedBy)) {
if (item.lockedUntil > System.currentTimeMillis()) {
item.lockedBy = null;
} else {
log.debug("%s does not need to unlock: lock has expired".formatted(lockedBy));
log.debug("{} does not need to unlock: lock has expired", lockedBy);
}
} else {
throw new RuntimeException("%s cannot unlock: item is locked by %s".formatted(lockedBy, item.lockedBy));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,51 +1,53 @@
package org.dcsa.conformance.core.state;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.SneakyThrows;
import static org.dcsa.conformance.core.toolkit.JsonToolkit.OBJECT_MAPPER;

import java.util.HashMap;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import java.util.TreeMap;

import static org.dcsa.conformance.core.toolkit.JsonToolkit.OBJECT_MAPPER;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import lombok.SneakyThrows;

public class MemorySortedPartitionsNonLockingMap implements SortedPartitionsNonLockingMap {
private final HashMap<String, TreeMap<String, JsonNode>> memoryMap = new HashMap<>();

private static final String VALUE = "value";
private final ConcurrentHashMap<String, ConcurrentSkipListMap<String, JsonNode>> memoryMap = new ConcurrentHashMap<>();

@SneakyThrows
@Override
public synchronized void setItemValue(String partitionKey, String sortKey, JsonNode value) {
JsonNode valueCopy = OBJECT_MAPPER.readTree(value.toString());
memoryMap
.computeIfAbsent(partitionKey, ignoredKey -> new TreeMap<>())
.put(sortKey, OBJECT_MAPPER.createObjectNode().set("value", valueCopy));
.computeIfAbsent(partitionKey, ignoredKey -> new ConcurrentSkipListMap<>())
.put(sortKey, OBJECT_MAPPER.createObjectNode().set(VALUE, valueCopy));
}

@Override
public synchronized JsonNode getItemValue(String partitionKey, String sortKey) {
return memoryMap
.getOrDefault(partitionKey, new TreeMap<>())
public JsonNode getItemValue(String partitionKey, String sortKey) {
return getPartitionOrDefault(partitionKey)
.getOrDefault(sortKey, OBJECT_MAPPER.createObjectNode())
.get("value");
.get(VALUE);
}

private synchronized ConcurrentSkipListMap<String, JsonNode> getPartitionOrDefault(String partitionKey) {
return memoryMap.getOrDefault(partitionKey, new ConcurrentSkipListMap<>());
}

@Override
public synchronized JsonNode getFirstItemValue(String partitionKey) {
TreeMap<String, JsonNode> partition = memoryMap.getOrDefault(partitionKey, new TreeMap<>());
return partition.isEmpty() ? null : partition.get(partition.firstKey()).get("value");
public JsonNode getFirstItemValue(String partitionKey) {
ConcurrentSkipListMap<String, JsonNode> partition = getPartitionOrDefault(partitionKey);
return partition.isEmpty() ? null : partition.get(partition.firstKey()).get(VALUE);
}

@Override
public synchronized JsonNode getLastItemValue(String partitionKey) {
TreeMap<String, JsonNode> partition = memoryMap.getOrDefault(partitionKey, new TreeMap<>());
return partition.isEmpty() ? null : partition.get(partition.lastKey()).get("value");
public JsonNode getLastItemValue(String partitionKey) {
ConcurrentSkipListMap<String, JsonNode> partition = getPartitionOrDefault(partitionKey);
return partition.isEmpty() ? null : partition.get(partition.lastKey()).get(VALUE);
}

@Override
public synchronized List<JsonNode> getPartitionValues(String partitionKey) {
return memoryMap.getOrDefault(partitionKey, new TreeMap<>()).values().stream()
.map(item -> item.get("value"))
public List<JsonNode> getPartitionValues(String partitionKey) {
return getPartitionOrDefault(partitionKey).values().stream()
.map(item -> item.get(VALUE))
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public String getHumanReadablePrompt() {
}

@Override
public synchronized Supplier<String> getSrrSupplier() {
public Supplier<String> getSrrSupplier() {
return srrSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.dcsa.conformance.standards.eblsurrender.action;

import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand Down Expand Up @@ -36,24 +34,14 @@ public SurrenderResponseAction(
this.requestSchemaValidator = requestSchemaValidator;
}

@Override
public ObjectNode exportJsonState() {
return super.exportJsonState();
}

@Override
public void importJsonState(JsonNode jsonState) {
super.importJsonState(jsonState);
}

@Override
public String getHumanReadablePrompt() {
return ("%s the surrender request with surrender request reference '%s'")
.formatted(accept ? "Accept" : "Reject", getSrrSupplier().get());
}

@Override
public synchronized Supplier<String> getSrrSupplier() {
public Supplier<String> getSrrSupplier() {
if (srrSupplier != null) return srrSupplier;
for (ConformanceAction action = this.previousAction;
action != null;
Expand Down
Loading