Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep memory persistence entries distinct by time. #66

Merged
merged 1 commit into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,26 @@
*/
package org.connectorio.addons.persistence.memory.internal;

import java.util.Comparator;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.NavigableSet;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeSet;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.openhab.core.types.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryBucket {

private final Logger logger = LoggerFactory.getLogger(MemoryBucket.class);

private final NavigableSet<MemoryEntry> entries = new TreeSet<>(
Comparator.comparing(MemoryEntry::getTimestamp)
);
private final NavigableMap<ZonedDateTime, State> entries = new TreeMap<>();

private final Lock lock = new ReentrantLock();
private int limit;
Expand All @@ -45,42 +45,45 @@ public MemoryBucket(int limit) {
this.limit = limit;
}

public void append(MemoryEntry entry) {
public void append(ZonedDateTime dateTime, State state) {
apply(myself -> {
logger.trace("Inserted entry {}. Stored entries {}, limit {}", entry, entries.size(), limit);
myself.entries.add(entry);
logger.trace("Inserted entry {}={}. Stored entries {}, limit {}", dateTime, state, entries.size(), limit);
entries.put(dateTime, state);
if (entries.size() > limit) {
entries.pollFirstEntry();
}
});
}

public void truncate(int limit) {
this.limit = limit;

apply(myself -> {
while (myself.entries.size() > limit) {
MemoryEntry entry = myself.entries.pollFirst();
while (entries.size() > limit) {
Entry<ZonedDateTime, State> entry = entries.pollFirstEntry();
logger.trace("Removed bucket entry {} as it exceeds limit {}", entry, limit);
}
});
}

public Stream<MemoryEntry> entries() {
return entries.stream();
public Stream<Entry<ZonedDateTime, State>> entries() {
return entries.entrySet().stream();
}

public void remove(MemoryEntry entry) {
apply(myself -> myself.entries.remove(entry));
public void remove(Entry<ZonedDateTime, State> entry) {
apply(myself -> myself.entries.remove(entry.getKey()));
}

public Integer getSize() {
return entries.size();
}

public Date getEarliest() {
return entries.isEmpty() ? null : Date.from(entries.first().getTimestamp().toInstant());
return entries.isEmpty() ? null : Date.from(entries.firstKey().toInstant());
}

public Date getOldest() {
return entries.isEmpty() ? null : Date.from(entries.last().getTimestamp().toInstant());
return entries.isEmpty() ? null : Date.from(entries.lastKey().toInstant());
}

public <X> X process(Function<MemoryBucket, X> function) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ public State getState() {
return state;
}

public String toString() {
return "MemoryHistoricItem [" + name + " " + timestamp + ", " + state + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Iterable<HistoricItem> query(FilterCriteria criteria) {

logger.debug("Querying item {} with date range {}-{}", criteria.getItemName(), criteria.getBeginDate(), criteria.getEndDate());
return buckets.get(criteria.getItemName()).process(bucket -> {
Stream<MemoryEntry> entries = filtering(bucket.entries(), criteria);
Stream<Entry<ZonedDateTime, State>> entries = filtering(bucket.entries(), criteria);
entries = sorting(entries, criteria);
entries = paging(entries, criteria);

Expand Down Expand Up @@ -158,7 +158,7 @@ public boolean remove(FilterCriteria criteria) throws IllegalArgumentException {

logger.trace("Removing entries from bucket {}", criteria.getItemName());
return buckets.get(criteria.getItemName()).process(bucket -> {
Stream<MemoryEntry> entries = filtering(bucket.entries(), criteria);
Stream<Entry<ZonedDateTime, State>> entries = filtering(bucket.entries(), criteria);

entries = sorting(entries, criteria);
entries = paging(entries, criteria);
Expand All @@ -178,39 +178,39 @@ private void memorize(String name, ZonedDateTime time, State state) {
}

MemoryBucket bucket = buckets.computeIfAbsent(name, key -> new MemoryBucket(limit));
MemoryEntry entry = new MemoryEntry(time, state);
logger.trace("Storing entry {} in bucket {} for item {}", entry, bucket, name);
bucket.append(entry);
logger.trace("Storing entry {}={} in bucket {} for item {}", time, state, bucket, name);
bucket.append(time, state);
}
private HistoricItem toHistoricItem(String name, MemoryEntry entry) {
return new MemoryHistoricItem(name, entry.getTimestamp(), entry.getState());
private HistoricItem toHistoricItem(String name, Entry<ZonedDateTime, State> entry) {
return new MemoryHistoricItem(name, entry.getKey(), entry.getValue());
}

private static Stream<MemoryEntry> filtering(Stream<MemoryEntry> entries, FilterCriteria criteria) {
private static Stream<Entry<ZonedDateTime, State>> filtering(Stream<Entry<ZonedDateTime, State>> entries, FilterCriteria criteria) {
return entries.filter(entry -> evaluate(entry, criteria));
}

private static Stream<MemoryEntry> sorting(Stream<MemoryEntry> entries, FilterCriteria criteria) {
private static Stream<Entry<ZonedDateTime, State>> sorting(Stream<Entry<ZonedDateTime, State>> entries, FilterCriteria criteria) {
if (criteria.getOrdering() == Ordering.DESCENDING) {
entries = entries.sorted(Comparator.comparing(MemoryEntry::getTimestamp).reversed());
Comparator<Entry<ZonedDateTime, State>> comparator = Entry.comparingByKey();
entries = entries.sorted(comparator.reversed());
}
return entries;
}

private static Stream<MemoryEntry> paging(Stream<MemoryEntry> stream, FilterCriteria criteria) {
private static Stream<Entry<ZonedDateTime, State>> paging(Stream<Entry<ZonedDateTime, State>> stream, FilterCriteria criteria) {
int offset = criteria.getPageNumber() * criteria.getPageSize();
return stream.skip(offset)
.limit(criteria.getPageSize());
}

private static boolean evaluate(MemoryEntry entry, FilterCriteria criteria) {
private static boolean evaluate(Entry<ZonedDateTime, State> entry, FilterCriteria criteria) {
ZonedDateTime beginDate = criteria.getBeginDate();
if (beginDate != null && entry.getTimestamp().isBefore(beginDate)) {
if (beginDate != null && entry.getKey().isBefore(beginDate)) {
return false;
}

ZonedDateTime endDate = criteria.getEndDate();
if (endDate != null && entry.getTimestamp().isAfter(endDate)) {
if (endDate != null && entry.getKey().isAfter(endDate)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Date;
import org.assertj.core.api.IterableAssert;
import org.connectorio.addons.test.ItemMutation;
import org.connectorio.addons.test.StubItemBuilder;
Expand All @@ -41,17 +46,12 @@ class MemoryPersistenceServiceTest {

public static final String TEST_1 = "test1";
public static final String TEST_2 = "test2";
@Mock
TimeZoneProvider tz;

TimeZoneProvider tz = () -> ZoneOffset.UTC;

Item item1 = StubItemBuilder.createNumber(TEST_1).build();
Item item2 = StubItemBuilder.createNumber(TEST_2).build();

@BeforeEach
void setup() {
when(tz.getTimeZone()).thenReturn(ZoneId.of("GMT"));
}

@Test
void testWriteAndQuery() {
MemoryPersistenceService service = new MemoryPersistenceService(tz);
Expand Down Expand Up @@ -134,4 +134,31 @@ void testPaging() {
.containsExactly(new DecimalType(20), new DecimalType(10));
}

@Test
void testWriteSameDate() {
ZonedDateTime dateTime = createInstant(2024, 2, 22, 19, 7, 30);
MemoryPersistenceService service = new MemoryPersistenceService(tz);
service.store(item2, Date.from(dateTime.toInstant()), new DecimalType(10));
service.store(item2, Date.from(dateTime.toInstant()), new DecimalType(20));

assertThat(service.getItemInfo())
.hasSize(1)
.element(0).matches(info -> info.getCount() == 1);

IterableAssert<HistoricItem> itemAssert = assertThat(service.query(new FilterCriteria().setItemName(TEST_2).setOrdering(Ordering.DESCENDING)))
.hasSize(1);
itemAssert.element(0).matches(state -> state.getState().equals(new DecimalType(20)));

assertThat(service.query(new FilterCriteria().setItemName(TEST_1)))
.isEmpty();
}

public static ZonedDateTime createInstant(int year, int month, int day, int hour, int minute, int second) {
return createInstant(year, month, day, hour, minute, second, 0);
}

public static ZonedDateTime createInstant(int year, int month, int day, int hour, int minute, int second, int nanos) {
return ZonedDateTime.of(LocalDate.of(year, month, day), LocalTime.of(hour, minute, second, nanos), ZoneOffset.UTC);
}

}
Loading