Skip to content

Commit

Permalink
TEZ-3331: Add operation specific HDFS counters for Tez UI
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractdog committed Nov 7, 2024
1 parent 19b1423 commit 60db8f1
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,42 @@
package org.apache.tez.common.counters;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;

@Private
public enum FileSystemCounter {
BYTES_READ,
BYTES_WRITTEN,
READ_OPS,
LARGE_READ_OPS,
WRITE_OPS,
HDFS_BYTES_READ,
HDFS_BYTES_WRITTEN,
FILE_BYTES_READ,
FILE_BYTES_WRITTEN
BYTES_READ("bytesRead"),
BYTES_WRITTEN("bytesWritten"),
READ_OPS("readOps"),
LARGE_READ_OPS("largeReadOps"),
WRITE_OPS("writeOps"),
HDFS_BYTES_READ("hdfsBytesRead"),
HDFS_BYTES_WRITTEN("hdfsBytesWritten"),
FILE_BYTES_READ("fileBytesRead"),
FILE_BYTES_WRITTEN("fileBytesWritten"),

// Additional counters from HADOOP-13305
OP_APPEND(CommonStatisticNames.OP_APPEND),
OP_CREATE(CommonStatisticNames.OP_CREATE),
OP_DELETE(CommonStatisticNames.OP_DELETE),
OP_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS),
OP_LIST_FILES(CommonStatisticNames.OP_LIST_FILES),
OP_LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS),
OP_MKDIRS(CommonStatisticNames.OP_MKDIRS),
OP_OPEN(CommonStatisticNames.OP_OPEN),
OP_RENAME(CommonStatisticNames.OP_RENAME),
OP_SET_ACL(CommonStatisticNames.OP_SET_ACL),
OP_SET_OWNER(CommonStatisticNames.OP_SET_OWNER),
OP_SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION),
OP_GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations");

private final String opName;

FileSystemCounter(String opName) {
this.opName = opName;
}

public String getOpName() {
return opName;
}
}
17 changes: 17 additions & 0 deletions tez-runtime-internals/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand All @@ -17,9 +17,7 @@

package org.apache.tez.runtime.metrics;

import java.util.List;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
Expand All @@ -30,50 +28,22 @@
*/
public class FileSystemStatisticUpdater {

private List<FileSystem.Statistics> stats;
private TezCounter readBytesCounter, writeBytesCounter, readOpsCounter, largeReadOpsCounter,
writeOpsCounter;
private String scheme;
private StorageStatistics stats;
private TezCounters counters;

FileSystemStatisticUpdater(TezCounters counters, List<FileSystem.Statistics> stats, String scheme) {
this.stats = stats;
this.scheme = scheme;
FileSystemStatisticUpdater(TezCounters counters, StorageStatistics storageStatistics) {
this.stats = storageStatistics;
this.counters = counters;
}

void updateCounters() {
if (readBytesCounter == null) {
readBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_READ);
}
if (writeBytesCounter == null) {
writeBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN);
}
if (readOpsCounter == null) {
readOpsCounter = counters.findCounter(scheme, FileSystemCounter.READ_OPS);
}
if (largeReadOpsCounter == null) {
largeReadOpsCounter = counters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS);
}
if (writeOpsCounter == null) {
writeOpsCounter = counters.findCounter(scheme, FileSystemCounter.WRITE_OPS);
}
long readBytes = 0;
long writeBytes = 0;
long readOps = 0;
long largeReadOps = 0;
long writeOps = 0;
for (FileSystem.Statistics stat : stats) {
readBytes = readBytes + stat.getBytesRead();
writeBytes = writeBytes + stat.getBytesWritten();
readOps = readOps + stat.getReadOps();
largeReadOps = largeReadOps + stat.getLargeReadOps();
writeOps = writeOps + stat.getWriteOps();
// loop through FileSystemCounter enums as it is a smaller set
for (FileSystemCounter fsCounter : FileSystemCounter.values()) {
Long val = stats.getLong(fsCounter.getOpName());
if (val != null && val != 0) {
TezCounter counter = counters.findCounter(stats.getScheme(), fsCounter);
counter.setValue(val);
}
}
readBytesCounter.setValue(readBytes);
writeBytesCounter.setValue(writeBytes);
readOpsCounter.setValue(readOps);
largeReadOpsCounter.setValue(largeReadOps);
writeOpsCounter.setValue(writeOps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

package org.apache.tez.runtime.metrics;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.tez.common.GcTimeUpdater;
import org.apache.tez.common.counters.TaskCounter;
Expand All @@ -48,11 +49,16 @@ public class TaskCounterUpdater {
private final TezCounters tezCounters;
private final Configuration conf;

// /**
// * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
// */
// private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
// new HashMap<>();
/**
* A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
* A Map where Key-> URIScheme and value->Map<Name, FileSystemStatisticUpdater>
*/
private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
new HashMap<String, FileSystemStatisticUpdater>();
private Map<String, Map<String, FileSystemStatisticUpdater>> statisticUpdaters =
new HashMap<>();
protected final GcTimeUpdater gcUpdater;
private ResourceCalculatorProcessTree pTree;
private long initCpuCumulativeTime = 0;
Expand All @@ -69,32 +75,16 @@ public TaskCounterUpdater(TezCounters counters, Configuration conf, String pid)


public void updateCounters() {
// FileSystemStatistics are reset each time a new task is seen by the
// container.
// This doesn't remove the fileSystem, and does not clear all statistics -
// so there is a potential of an unused FileSystem showing up for a
// Container, and strange values for READ_OPS etc.
Map<String, List<FileSystem.Statistics>> map = new
HashMap<String, List<FileSystem.Statistics>>();
for(Statistics stat: FileSystem.getAllStatistics()) {
String uriScheme = stat.getScheme();
if (map.containsKey(uriScheme)) {
List<FileSystem.Statistics> list = map.get(uriScheme);
list.add(stat);
} else {
List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
list.add(stat);
map.put(uriScheme, list);
}
}
for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
if(updater==null) {//new FileSystem has been found in the cache
updater =
new FileSystemStatisticUpdater(tezCounters, entry.getValue(),
entry.getKey());
statisticUpdaters.put(entry.getKey(), updater);
GlobalStorageStatistics globalStorageStatistics = FileSystem.getGlobalStorageStatistics();
Iterator<StorageStatistics> iter = globalStorageStatistics.iterator();
while (iter.hasNext()) {
StorageStatistics stats = iter.next();
if (!statisticUpdaters.containsKey(stats.getScheme())) {
Map<String, FileSystemStatisticUpdater> updaterSet = new TreeMap<>();
statisticUpdaters.put(stats.getScheme(), updaterSet);
}
FileSystemStatisticUpdater updater = statisticUpdaters.get(stats.getScheme())
.computeIfAbsent(stats.getName(), k -> new FileSystemStatisticUpdater(tezCounters, stats));
updater.updateCounters();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.metrics;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestFileSystemStatisticUpdater {

private static final Logger LOG = LoggerFactory.getLogger(
TestFileSystemStatisticUpdater.class);

private static MiniDFSCluster dfsCluster;

private static Configuration conf = new Configuration();
private static FileSystem remoteFs;

private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR +
TestFileSystemStatisticUpdater.class.getName() + "-tmpDir";

@BeforeClass
public static void setup() throws IOException {
try {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
.build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
}

@AfterClass
public static void tearDown() {
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
}
}

@Test
public void basicTest() throws IOException {
TezCounters counters = new TezCounters();
TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");

remoteFs.mkdirs(new Path("/tmp/foo/"));
FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt"));
out.writeBytes("xyz");
out.close();

updater.updateCounters();

LOG.info("Counters: " + counters);
TezCounter mkdirCounter = counters.findCounter(remoteFs.getScheme(),
FileSystemCounter.OP_MKDIRS);
TezCounter createCounter = counters.findCounter(remoteFs.getScheme(),
FileSystemCounter.OP_CREATE);
Assert.assertNotNull(mkdirCounter);
Assert.assertNotNull(createCounter);
Assert.assertEquals(1, mkdirCounter.getValue());
Assert.assertEquals(1, createCounter.getValue());

FSDataOutputStream out1 = remoteFs.create(new Path("/tmp/foo/abc1.txt"));
out1.writeBytes("xyz");
out1.close();

long oldCreateVal = createCounter.getValue();
updater.updateCounters();

LOG.info("Counters: " + counters);
Assert.assertTrue("Counter not updated, old=" + oldCreateVal
+ ", new=" + createCounter.getValue(), createCounter.getValue() > oldCreateVal);

oldCreateVal = createCounter.getValue();
// Ensure all numbers are reset
remoteFs.clearStatistics();
updater.updateCounters();
LOG.info("Counters: " + counters);
Assert.assertEquals(oldCreateVal, createCounter.getValue());

}



}
Loading

0 comments on commit 60db8f1

Please sign in to comment.