Skip to content

Commit

Permalink
Fix group by nature month when timezone is not default
Browse files Browse the repository at this point in the history
  • Loading branch information
Wei-hao-Li authored Dec 11, 2024
1 parent 21652cc commit 5898668
Show file tree
Hide file tree
Showing 28 changed files with 220 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand Down Expand Up @@ -489,6 +490,20 @@ public ISession getSessionConnection() throws IoTDBConnectionException {
return session;
}

@Override
public ISession getSessionConnection(ZoneId zoneId) throws IoTDBConnectionException {
final DataNodeWrapper dataNode =
this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size()));
final Session session =
new Session.Builder()
.host(dataNode.getIp())
.port(dataNode.getPort())
.zoneId(zoneId)
.build();
session.open();
return session;
}

@Override
public ISession getSessionConnection(final String userName, final String password)
throws IoTDBConnectionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -273,6 +274,14 @@ public ISession getSessionConnection() throws IoTDBConnectionException {
return session;
}

@Override
public ISession getSessionConnection(ZoneId zoneId) throws IoTDBConnectionException {
Session session =
new Session.Builder().host(ip_addr).port(Integer.parseInt(port)).zoneId(zoneId).build();
session.open();
return session;
}

@Override
public ITableSession getTableSessionConnection() throws IoTDBConnectionException {
return new TableSessionBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.net.URL;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -193,6 +194,8 @@ IConfigNodeRPCService.Iface getLeaderConfigNodeConnection()

ISession getSessionConnection() throws IoTDBConnectionException;

ISession getSessionConnection(ZoneId zoneId) throws IoTDBConnectionException;

ISession getSessionConnection(String userName, String password) throws IoTDBConnectionException;

ISession getSessionConnection(List<String> nodeUrls) throws IoTDBConnectionException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.it.groupby;

import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
Expand Down Expand Up @@ -49,6 +51,7 @@
import static org.apache.iotdb.db.utils.constant.TestConstant.sum;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.count;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
Expand All @@ -75,6 +78,8 @@ public class IoTDBGroupByNaturalMonthIT {
calendar.add(Calendar.MONTH, 1), i = calendar.getTimeInMillis()) {
dataSet.add("insert into root.test.d1(timestamp, s1) values (" + i + ", 1)");
}

dataSet.add("insert into root.testTimeZone.d1(timestamp, s1) values (1, 1)");
}

protected static final DateFormat df = new SimpleDateFormat("MM/dd/yyyy:HH:mm:ss");
Expand Down Expand Up @@ -413,4 +418,27 @@ public void groupByNaturalMonthWithMixedUnit2() {
null,
currPrecision);
}

@Test
public void groupByNaturalMonthWithNonSystemDefaultTimeZone() {
try (ISession session =
EnvFactory.getEnv().getSessionConnection(TimeZone.getTimeZone("UTC+09:00").toZoneId())) {

SessionDataSet sessionDataSet =
session.executeQueryStatement(
"select count(s1) from root.testTimeZone.d1 group by([2024-07-01, 2024-08-01), 1mo)");

int count = 0;
while (sessionDataSet.hasNext()) {
sessionDataSet.next();
count++;
}
assertEquals(1, count);

sessionDataSet.closeOperationHandle();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ private List<TsBlock> executeGroupByQueryInternal(
scanOptionsBuilder.build(),
driverContext.getOperatorContexts().get(0),
Collections.singletonList(aggregator),
initTimeRangeIterator(groupByTimeParameter, true, true),
initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()),
groupByTimeParameter,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
!TSDataType.BLOB.equals(dataType)
Expand All @@ -805,7 +805,7 @@ private List<TsBlock> executeGroupByQueryInternal(
scanOptionsBuilder.build(),
driverContext.getOperatorContexts().get(0),
Collections.singletonList(aggregator),
initTimeRangeIterator(groupByTimeParameter, true, true),
initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()),
groupByTimeParameter,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
!TSDataType.BLOB.equals(dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.TimeDuration;

import java.time.ZoneId;

/**
* This class iteratively generates aggregated time windows.
*
Expand All @@ -46,21 +48,25 @@ public class AggrWindowIterator implements ITimeRangeIterator {
// The number of current timeRange, it's used to calculate the cpu when there contains month
private int timeRangeCount;

private final ZoneId zoneId;

@SuppressWarnings("squid:S107")
public AggrWindowIterator(
long startTime,
long endTime,
TimeDuration interval,
TimeDuration slidingStep,
boolean isAscending,
boolean leftCRightO) {
boolean leftCRightO,
ZoneId zoneId) {
this.startTime = startTime;
this.endTime = endTime;
this.interval = interval;
this.slidingStep = slidingStep;
this.isAscending = isAscending;
this.leftCRightO = leftCRightO;
this.timeRangeCount = 0;
this.zoneId = zoneId;
}

@Override
Expand All @@ -78,7 +84,7 @@ private TimeRange getLeftmostTimeRange() {
// calculate interval length by natural month based on startTime
// ie. startTIme = 1/31, interval = 1mo, curEndTime will be set to 2/29
retEndTime =
Math.min(DateTimeUtils.calcPositiveIntervalByMonth(startTime, interval), endTime);
Math.min(DateTimeUtils.calcPositiveIntervalByMonth(startTime, interval, zoneId), endTime);
} else {
retEndTime = Math.min(startTime + interval.nonMonthDuration, endTime);
}
Expand All @@ -99,14 +105,14 @@ private TimeRange getRightmostTimeRange() {
/ (slidingStep.getMaxTotalDuration(TimestampPrecisionUtils.currPrecision)));
long tempRetStartTime =
DateTimeUtils.calcPositiveIntervalByMonth(
startTime, slidingStep.multiple(intervalNum - 1));
startTime, slidingStep.multiple(intervalNum - 1), zoneId);
retStartTime = tempRetStartTime;
while (tempRetStartTime < endTime) {
intervalNum++;
retStartTime = tempRetStartTime;
tempRetStartTime =
DateTimeUtils.calcPositiveIntervalByMonth(
retStartTime, slidingStep.multiple(intervalNum - 1));
retStartTime, slidingStep.multiple(intervalNum - 1), zoneId);
}
intervalNum -= 1;
} else {
Expand All @@ -120,7 +126,7 @@ private TimeRange getRightmostTimeRange() {
retEndTime =
Math.min(
DateTimeUtils.calcPositiveIntervalByMonth(
startTime, interval.merge(slidingStep.multiple(intervalNum - 1))),
startTime, interval.merge(slidingStep.multiple(intervalNum - 1)), zoneId),
endTime);
} else {
retEndTime = Math.min(retStartTime + interval.nonMonthDuration, endTime);
Expand All @@ -147,7 +153,7 @@ public boolean hasNextTimeRange() {
if (slidingStep.containsMonth()) {
retStartTime =
DateTimeUtils.calcPositiveIntervalByMonth(
startTime, slidingStep.multiple(timeRangeCount));
startTime, slidingStep.multiple(timeRangeCount), zoneId);
} else {
retStartTime = curStartTime + slidingStep.nonMonthDuration;
}
Expand All @@ -171,7 +177,7 @@ public boolean hasNextTimeRange() {
if (interval.containsMonth()) {
retEndTime =
DateTimeUtils.calcPositiveIntervalByMonth(
startTime, slidingStep.multiple(timeRangeCount).merge(interval));
startTime, slidingStep.multiple(timeRangeCount).merge(interval), zoneId);
} else {
retEndTime = retStartTime + interval.nonMonthDuration;
}
Expand Down Expand Up @@ -213,11 +219,13 @@ public long getTotalIntervalNum() {
(double) queryRange
/ (slidingStep.getMaxTotalDuration(TimestampPrecisionUtils.currPrecision)));
long retStartTime =
DateTimeUtils.calcPositiveIntervalByMonth(startTime, slidingStep.multiple(intervalNum));
DateTimeUtils.calcPositiveIntervalByMonth(
startTime, slidingStep.multiple(intervalNum), zoneId);
while (retStartTime < endTime) {
intervalNum++;
retStartTime =
DateTimeUtils.calcPositiveIntervalByMonth(startTime, slidingStep.multiple(intervalNum));
DateTimeUtils.calcPositiveIntervalByMonth(
startTime, slidingStep.multiple(intervalNum), zoneId);
}
} else {
intervalNum = (long) Math.ceil(queryRange / (double) slidingStep.nonMonthDuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.TimeDuration;

import java.time.ZoneId;

public class PreAggrWindowWithNaturalMonthIterator implements ITimeRangeIterator {

private static final int HEAP_MAX_SIZE = 100;
Expand All @@ -46,11 +48,13 @@ public PreAggrWindowWithNaturalMonthIterator(
TimeDuration interval,
TimeDuration slidingStep,
boolean isAscending,
boolean leftCRightO) {
boolean leftCRightO,
ZoneId zoneId) {
this.isAscending = isAscending;
this.timeBoundaryHeap = new TimeSelector(HEAP_MAX_SIZE, isAscending);
this.aggrWindowIterator =
new AggrWindowIterator(startTime, endTime, interval, slidingStep, isAscending, leftCRightO);
new AggrWindowIterator(
startTime, endTime, interval, slidingStep, isAscending, leftCRightO, zoneId);
this.leftCRightO = leftCRightO;
initHeap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import org.apache.tsfile.utils.TimeDuration;

import java.time.ZoneId;

public class TimeRangeIteratorFactory {

private TimeRangeIteratorFactory() {}
Expand All @@ -40,7 +42,8 @@ public static ITimeRangeIterator getTimeRangeIterator(
TimeDuration slidingStep,
boolean isAscending,
boolean leftCRightO,
boolean outputPartialTimeWindow) {
boolean outputPartialTimeWindow,
ZoneId zoneId) {
if (outputPartialTimeWindow
&& interval.getTotalDuration(TimestampPrecisionUtils.currPrecision)
> slidingStep.getTotalDuration(TimestampPrecisionUtils.currPrecision)) {
Expand All @@ -54,11 +57,11 @@ public static ITimeRangeIterator getTimeRangeIterator(
leftCRightO);
} else {
return new PreAggrWindowWithNaturalMonthIterator(
startTime, endTime, interval, slidingStep, isAscending, leftCRightO);
startTime, endTime, interval, slidingStep, isAscending, leftCRightO, zoneId);
}
} else {
return new AggrWindowIterator(
startTime, endTime, interval, slidingStep, isAscending, leftCRightO);
startTime, endTime, interval, slidingStep, isAscending, leftCRightO, zoneId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.Pair;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -73,7 +74,8 @@ private AggregationUtil() {
public static ITimeRangeIterator initTimeRangeIterator(
GroupByTimeParameter groupByTimeParameter,
boolean ascending,
boolean outputPartialTimeWindow) {
boolean outputPartialTimeWindow,
ZoneId zoneId) {
if (groupByTimeParameter == null) {
return new SingleTimeWindowIterator(Long.MIN_VALUE, Long.MAX_VALUE);
} else {
Expand All @@ -84,7 +86,8 @@ public static ITimeRangeIterator initTimeRangeIterator(
groupByTimeParameter.getSlidingStep(),
ascending,
groupByTimeParameter.isLeftCRightO(),
outputPartialTimeWindow);
outputPartialTimeWindow,
zoneId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.utils.RamUsageEstimator;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -61,7 +62,8 @@ public SlidingWindowAggregationOperator(
boolean ascending,
boolean outputEndTime,
GroupByTimeParameter groupByTimeParameter,
long maxReturnSize) {
long maxReturnSize,
ZoneId zoneId) {
super(operatorContext, aggregators, child, ascending, maxReturnSize);
checkArgument(
groupByTimeParameter != null,
Expand All @@ -78,7 +80,8 @@ public SlidingWindowAggregationOperator(

this.timeRangeIterator = timeRangeIterator;
this.outputEndTime = outputEndTime;
this.subTimeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
this.subTimeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, true, zoneId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext contex

if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
// remove the device which won't appear in resultSet after limit/offset
deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, queryStatement);
deviceList =
pushDownLimitOffsetInGroupByTimeForDevice(
deviceList, queryStatement, context.getZoneId());
}

outputExpressions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ static boolean canBuildAggregationPlanUseTemplate(

if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
// remove the device which won't appear in resultSet after limit/offset
deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, queryStatement);
deviceList =
pushDownLimitOffsetInGroupByTimeForDevice(
deviceList, queryStatement, context.getZoneId());
}

List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
Expand Down
Loading

0 comments on commit 5898668

Please sign in to comment.