Skip to content

Commit

Permalink
Enhancing FS stats to include read/write time, queue size and IO time (
Browse files Browse the repository at this point in the history
…opensearch-project#10541)

* Enhancing FS stats to include read / write time, io time and queue size

Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie authored Oct 17, 2023
1 parent 4112f8d commit 8bc6791
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107))
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679))
- Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618))
- [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10541](https://github.com/opensearch-project/OpenSearch/pull/10541))
- [Admission control] Add Resource usage collector service and resource usage tracker ([#9890](https://github.com/opensearch-project/OpenSearch/pull/9890))

### Dependencies
Expand Down
178 changes: 174 additions & 4 deletions server/src/main/java/org/opensearch/monitor/fs/FsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ public static class DeviceStats implements Writeable, ToXContentFragment {
final long previousWritesCompleted;
final long currentSectorsWritten;
final long previousSectorsWritten;
final long currentReadTime;
final long previousReadTime;
final long currentWriteTime;
final long previousWriteTime;
final long currentQueueSize;
final long previousQueueSize;
final long currentIOTime;
final long previousIOTime;

public DeviceStats(
final int majorDeviceNumber,
Expand All @@ -244,6 +252,10 @@ public DeviceStats(
final long currentSectorsRead,
final long currentWritesCompleted,
final long currentSectorsWritten,
final long currentReadTime,
final long currentWriteTime,
final long currrentQueueSize,
final long currentIOTime,
final DeviceStats previousDeviceStats
) {
this(
Expand All @@ -257,7 +269,15 @@ public DeviceStats(
currentSectorsRead,
previousDeviceStats != null ? previousDeviceStats.currentSectorsRead : -1,
currentWritesCompleted,
previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1
previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1,
currentReadTime,
previousDeviceStats != null ? previousDeviceStats.currentReadTime : -1,
currentWriteTime,
previousDeviceStats != null ? previousDeviceStats.currentWriteTime : -1,
currrentQueueSize,
previousDeviceStats != null ? previousDeviceStats.currentQueueSize : -1,
currentIOTime,
previousDeviceStats != null ? previousDeviceStats.currentIOTime : -1
);
}

Expand All @@ -272,7 +292,15 @@ private DeviceStats(
final long currentSectorsRead,
final long previousSectorsRead,
final long currentWritesCompleted,
final long previousWritesCompleted
final long previousWritesCompleted,
final long currentReadTime,
final long previousReadTime,
final long currentWriteTime,
final long previousWriteTime,
final long currentQueueSize,
final long previousQueueSize,
final long currentIOTime,
final long previousIOTime
) {
this.majorDeviceNumber = majorDeviceNumber;
this.minorDeviceNumber = minorDeviceNumber;
Expand All @@ -285,6 +313,14 @@ private DeviceStats(
this.previousSectorsRead = previousSectorsRead;
this.currentSectorsWritten = currentSectorsWritten;
this.previousSectorsWritten = previousSectorsWritten;
this.currentReadTime = currentReadTime;
this.previousReadTime = previousReadTime;
this.currentWriteTime = currentWriteTime;
this.previousWriteTime = previousWriteTime;
this.currentQueueSize = currentQueueSize;
this.previousQueueSize = previousQueueSize;
this.currentIOTime = currentIOTime;
this.previousIOTime = previousIOTime;
}

public DeviceStats(StreamInput in) throws IOException {
Expand All @@ -299,6 +335,25 @@ public DeviceStats(StreamInput in) throws IOException {
previousSectorsRead = in.readLong();
currentSectorsWritten = in.readLong();
previousSectorsWritten = in.readLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
currentReadTime = in.readLong();
previousReadTime = in.readLong();
currentWriteTime = in.readLong();
previousWriteTime = in.readLong();
currentQueueSize = in.readLong();
previousQueueSize = in.readLong();
currentIOTime = in.readLong();
previousIOTime = in.readLong();
} else {
currentReadTime = 0;
previousReadTime = 0;
currentWriteTime = 0;
previousWriteTime = 0;
currentQueueSize = 0;
previousQueueSize = 0;
currentIOTime = 0;
previousIOTime = 0;
}
}

@Override
Expand All @@ -314,6 +369,16 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(previousSectorsRead);
out.writeLong(currentSectorsWritten);
out.writeLong(previousSectorsWritten);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(currentReadTime);
out.writeLong(previousReadTime);
out.writeLong(currentWriteTime);
out.writeLong(previousWriteTime);
out.writeLong(currentQueueSize);
out.writeLong(previousQueueSize);
out.writeLong(currentIOTime);
out.writeLong(previousIOTime);
}
}

public long operations() {
Expand Down Expand Up @@ -346,6 +411,39 @@ public long writeKilobytes() {
return (currentSectorsWritten - previousSectorsWritten) / 2;
}

/**
* Total time taken for all read operations
*/
public long readTime() {
if (previousReadTime == -1) return -1;
return currentReadTime - previousReadTime;
}

/**
* Total time taken for all write operations
*/
public long writeTime() {
if (previousWriteTime == -1) return -1;
return currentWriteTime - previousWriteTime;
}

/**
* Queue size based on weighted time spent doing I/Os
*/
public long queueSize() {
if (previousQueueSize == -1) return -1;
return currentQueueSize - previousQueueSize;
}

/**
* Total time spent doing I/Os
*/
public long ioTimeInMillis() {
if (previousIOTime == -1) return -1;

return (currentIOTime - previousIOTime);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("device_name", deviceName);
Expand All @@ -354,9 +452,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(IoStats.WRITE_OPERATIONS, writeOperations());
builder.field(IoStats.READ_KILOBYTES, readKilobytes());
builder.field(IoStats.WRITE_KILOBYTES, writeKilobytes());
builder.field(IoStats.READ_TIME, readTime());
builder.field(IoStats.WRITE_TIME, writeTime());
builder.field(IoStats.QUEUE_SIZE, queueSize());
builder.field(IoStats.IO_TIME_MS, ioTimeInMillis());
return builder;
}

}

/**
Expand All @@ -371,13 +472,21 @@ public static class IoStats implements Writeable, ToXContentFragment {
private static final String WRITE_OPERATIONS = "write_operations";
private static final String READ_KILOBYTES = "read_kilobytes";
private static final String WRITE_KILOBYTES = "write_kilobytes";
private static final String READ_TIME = "read_time";
private static final String WRITE_TIME = "write_time";
private static final String QUEUE_SIZE = "queue_size";
private static final String IO_TIME_MS = "io_time_in_millis";

final DeviceStats[] devicesStats;
final long totalOperations;
final long totalReadOperations;
final long totalWriteOperations;
final long totalReadKilobytes;
final long totalWriteKilobytes;
final long totalReadTime;
final long totalWriteTime;
final long totalQueueSize;
final long totalIOTimeInMillis;

public IoStats(final DeviceStats[] devicesStats) {
this.devicesStats = devicesStats;
Expand All @@ -386,18 +495,30 @@ public IoStats(final DeviceStats[] devicesStats) {
long totalWriteOperations = 0;
long totalReadKilobytes = 0;
long totalWriteKilobytes = 0;
long totalReadTime = 0;
long totalWriteTime = 0;
long totalQueueSize = 0;
long totalIOTimeInMillis = 0;
for (DeviceStats deviceStats : devicesStats) {
totalOperations += deviceStats.operations() != -1 ? deviceStats.operations() : 0;
totalReadOperations += deviceStats.readOperations() != -1 ? deviceStats.readOperations() : 0;
totalWriteOperations += deviceStats.writeOperations() != -1 ? deviceStats.writeOperations() : 0;
totalReadKilobytes += deviceStats.readKilobytes() != -1 ? deviceStats.readKilobytes() : 0;
totalWriteKilobytes += deviceStats.writeKilobytes() != -1 ? deviceStats.writeKilobytes() : 0;
totalReadTime += deviceStats.readTime() != -1 ? deviceStats.readTime() : 0;
totalWriteTime += deviceStats.writeTime() != -1 ? deviceStats.writeTime() : 0;
totalQueueSize += deviceStats.queueSize() != -1 ? deviceStats.queueSize() : 0;
totalIOTimeInMillis += deviceStats.ioTimeInMillis() != -1 ? deviceStats.ioTimeInMillis() : 0;
}
this.totalOperations = totalOperations;
this.totalReadOperations = totalReadOperations;
this.totalWriteOperations = totalWriteOperations;
this.totalReadKilobytes = totalReadKilobytes;
this.totalWriteKilobytes = totalWriteKilobytes;
this.totalReadTime = totalReadTime;
this.totalWriteTime = totalWriteTime;
this.totalQueueSize = totalQueueSize;
this.totalIOTimeInMillis = totalIOTimeInMillis;
}

public IoStats(StreamInput in) throws IOException {
Expand All @@ -412,6 +533,17 @@ public IoStats(StreamInput in) throws IOException {
this.totalWriteOperations = in.readLong();
this.totalReadKilobytes = in.readLong();
this.totalWriteKilobytes = in.readLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.totalReadTime = in.readLong();
this.totalWriteTime = in.readLong();
this.totalQueueSize = in.readLong();
this.totalIOTimeInMillis = in.readLong();
} else {
this.totalReadTime = 0;
this.totalWriteTime = 0;
this.totalQueueSize = 0;
this.totalIOTimeInMillis = 0;
}
}

@Override
Expand All @@ -425,6 +557,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(totalWriteOperations);
out.writeLong(totalReadKilobytes);
out.writeLong(totalWriteKilobytes);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(totalReadTime);
out.writeLong(totalWriteTime);
out.writeLong(totalQueueSize);
out.writeLong(totalIOTimeInMillis);
}
}

public DeviceStats[] getDevicesStats() {
Expand All @@ -451,6 +589,34 @@ public long getTotalWriteKilobytes() {
return totalWriteKilobytes;
}

/**
* Sum of read time across all devices
*/
public long getTotalReadTime() {
return totalReadTime;
}

/**
* Sum of write time across all devices
*/
public long getTotalWriteTime() {
return totalWriteTime;
}

/**
* Sum of queue size across all devices
*/
public long getTotalQueueSize() {
return totalQueueSize;
}

/**
* Sum of IO time across all devices
*/
public long getTotalIOTimeMillis() {
return totalIOTimeInMillis;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (devicesStats.length > 0) {
Expand All @@ -468,11 +634,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(WRITE_OPERATIONS, totalWriteOperations);
builder.field(READ_KILOBYTES, totalReadKilobytes);
builder.field(WRITE_KILOBYTES, totalWriteKilobytes);

builder.field(READ_TIME, totalReadTime);
builder.field(WRITE_TIME, totalWriteTime);
builder.field(QUEUE_SIZE, totalQueueSize);
builder.field(IO_TIME_MS, totalIOTimeInMillis);
builder.endObject();
}
return builder;
}

}

private final long timestamp;
Expand Down
29 changes: 29 additions & 0 deletions server/src/main/java/org/opensearch/monitor/fs/FsProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,25 @@ final FsInfo.IoStats ioStats(final Set<Tuple<Integer, Integer>> devicesNumbers,

List<FsInfo.DeviceStats> devicesStats = new ArrayList<>();

/**
* The /proc/diskstats file displays the I/O statistics of block devices.
* Each line contains the following 14 fields: ( + additional fields )
*
* 1 major number
* 2 minor number
* 3 device name
* 4 reads completed successfully
* 5 reads merged
* 6 sectors read
* 7 time spent reading (ms)
* 8 writes completed
* 9 writes merged
* 10 sectors written
* 11 time spent writing (ms)
* 12 I/Os currently in progress
* 13 time spent doing I/Os (ms) ---- IO use percent
* 14 weighted time spent doing I/Os (ms) ---- Queue size
*/
List<String> lines = readProcDiskStats();
if (!lines.isEmpty()) {
for (String line : lines) {
Expand All @@ -123,6 +142,12 @@ final FsInfo.IoStats ioStats(final Set<Tuple<Integer, Integer>> devicesNumbers,
final long sectorsRead = Long.parseLong(fields[5]);
final long writesCompleted = Long.parseLong(fields[7]);
final long sectorsWritten = Long.parseLong(fields[9]);
// readTime and writeTime calculates the total read/write time taken for each request to complete
// ioTime calculates actual time queue and disks are busy
final long readTime = Long.parseLong(fields[6]);
final long writeTime = Long.parseLong(fields[10]);
final long ioTime = fields.length > 12 ? Long.parseLong(fields[12]) : 0;
final long queueSize = fields.length > 13 ? Long.parseLong(fields[13]) : 0;
final FsInfo.DeviceStats deviceStats = new FsInfo.DeviceStats(
majorDeviceNumber,
minorDeviceNumber,
Expand All @@ -131,6 +156,10 @@ final FsInfo.IoStats ioStats(final Set<Tuple<Integer, Integer>> devicesNumbers,
sectorsRead,
writesCompleted,
sectorsWritten,
readTime,
writeTime,
queueSize,
ioTime,
deviceMap.get(Tuple.tuple(majorDeviceNumber, minorDeviceNumber))
);
devicesStats.add(deviceStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ public void testSerialization() throws IOException {
assertEquals(ioStats.getTotalReadOperations(), deserializedIoStats.getTotalReadOperations());
assertEquals(ioStats.getTotalWriteKilobytes(), deserializedIoStats.getTotalWriteKilobytes());
assertEquals(ioStats.getTotalWriteOperations(), deserializedIoStats.getTotalWriteOperations());
assertEquals(ioStats.getTotalReadTime(), deserializedIoStats.getTotalReadTime());
assertEquals(ioStats.getTotalWriteTime(), deserializedIoStats.getTotalWriteTime());
assertEquals(ioStats.getTotalQueueSize(), deserializedIoStats.getTotalQueueSize());
assertEquals(ioStats.getTotalIOTimeMillis(), deserializedIoStats.getTotalIOTimeMillis());
assertEquals(ioStats.getDevicesStats().length, deserializedIoStats.getDevicesStats().length);
for (int i = 0; i < ioStats.getDevicesStats().length; i++) {
FsInfo.DeviceStats deviceStats = ioStats.getDevicesStats()[i];
Expand Down Expand Up @@ -645,6 +649,10 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
null
);
deviceStatsArray[i] = new FsInfo.DeviceStats(
Expand All @@ -655,6 +663,10 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
previousDeviceStats
);
}
Expand Down
Loading

0 comments on commit 8bc6791

Please sign in to comment.