Skip to content

Commit

Permalink
Replace synchronized blocks with ReentrantLocks for virtual thread su…
Browse files Browse the repository at this point in the history
…pport

* The lock call was moved outside the try block
* Unnecessary locking has been removed.
  • Loading branch information
omercelikceng authored Dec 6, 2024
1 parent 58ffb22 commit 29541a5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -62,6 +64,7 @@
* @author Thomas Risberg
* @author Ashu Gairola
* @author Akos Ratku
* @author Omer Celik
*/
@AutoConfiguration(after = CassandraReactiveDataAutoConfiguration.class)
@EnableConfigurationProperties(CassandraConsumerProperties.class)
Expand Down Expand Up @@ -147,6 +150,8 @@ private static class PayloadToMatrixTransformer extends AbstractPayloadTransform

private final ISO8601StdDateFormat dateFormat = new ISO8601StdDateFormat();

private final Lock dateLock = new ReentrantLock();

PayloadToMatrixTransformer(ObjectMapper objectMapper, String query, ColumnNameExtractor columnNameExtractor) {
this.jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper);
this.columns.addAll(columnNameExtractor.extract(query));
Expand All @@ -170,9 +175,13 @@ protected List<List<Object>> transformPayload(Object payload) {
Object value = entity.get(column);
if (value instanceof String string) {
if (this.dateFormat.looksLikeISO8601(string)) {
synchronized (this.dateFormat) {
this.dateLock.lock();
try {
value = new Date(this.dateFormat.parse(string).getTime()).toLocalDate();
}
finally {
this.dateLock.unlock();
}
}
if (isUuid(string)) {
value = UUID.fromString(string);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* A repository for {@link Trace}s.
Expand All @@ -31,6 +33,7 @@
* @author Dave Syer
* @author Olivier Bourgain
* @author Artem Bilan
* @author Omer Celik
* @since 2.0
*/
public class InMemoryTraceRepository {
Expand All @@ -41,35 +44,32 @@ public class InMemoryTraceRepository {

private final List<Trace> traces = new LinkedList<>();

private final Lock tracesLock = new ReentrantLock();

/**
* Flag to say that the repository lists traces in reverse order.
* @param reverse flag value (default true)
*/
public void setReverse(boolean reverse) {
synchronized (this.traces) {
this.reverse = reverse;
}
this.reverse = reverse;
}

/**
* Set the capacity of the in-memory repository.
* @param capacity the capacity
*/
public void setCapacity(int capacity) {
synchronized (this.traces) {
this.capacity = capacity;
}
this.capacity = capacity;
}

public List<Trace> findAll() {
synchronized (this.traces) {
return Collections.unmodifiableList(this.traces);
}
return Collections.unmodifiableList(this.traces);
}

public void add(Map<String, Object> map) {
Trace trace = new Trace(new Date(), map);
synchronized (this.traces) {
this.tracesLock.lock();
try {
while (this.traces.size() >= this.capacity) {
this.traces.remove(this.reverse ? this.capacity - 1 : 0);
}
Expand All @@ -80,6 +80,9 @@ public void add(Map<String, Object> map) {
this.traces.add(trace);
}
}
finally {
this.tracesLock.unlock();
}
}

}

0 comments on commit 29541a5

Please sign in to comment.