Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add count cmd including quota usage #337

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/scripts/install-hdfs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ EOF
done
fi

URL="https://dlcdn.apache.org/hadoop/core/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz"
URL="https://archive.apache.org/dist/hadoop/core/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz"
echo "Downloading $URL"
curl -o hadoop.tar.gz $URL
tar zxf hadoop.tar.gz
Expand Down
34 changes: 19 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,25 @@ verbs:
The flags available are a subset of the POSIX ones, but should behave similarly.

Valid commands:
ls [-lah] [FILE]...
rm [-rf] FILE...
mv [-fT] SOURCE... DEST
mkdir [-p] FILE...
touch [-amc] FILE...
chmod [-R] OCTAL-MODE FILE...
chown [-R] OWNER[:GROUP] FILE...
cat SOURCE...
head [-n LINES | -c BYTES] SOURCE...
tail [-n LINES | -c BYTES] SOURCE...
du [-sh] FILE...
checksum FILE...
get SOURCE [DEST]
getmerge SOURCE DEST
put SOURCE DEST
ls [-lahR] [FILE]...
rm [-rf] FILE...
mv [-nT] SOURCE... DEST
mkdir [-p] FILE...
touch [-c] FILE...
chmod [-R] OCTAL-MODE FILE...
chown [-R] OWNER[:GROUP] FILE...
cat SOURCE...
head [-n LINES | -c BYTES] SOURCE...
tail [-n LINES | -c BYTES] SOURCE...
test [-defsz] FILE...
du [-sh] FILE...
checksum FILE...
get SOURCE [DEST]
getmerge SOURCE DEST
put SOURCE DEST
df [-h]
truncate SIZE FILE
count [-q] [-h] <path> ...

Since it doesn't have to wait for the JVM to start up, it's also a lot faster
`hadoop -fs`:
Expand Down
121 changes: 121 additions & 0 deletions cmd/hdfs/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"fmt"
"os"
"text/tabwriter"
)

var (
quotaHeaderFields = []string{"QUOTA", "REM_QUOTA", "SPACE_QUOTA", "REM_SPACE_QUOTA"}
summaryHeaderFields = []string{"DIR_COUNT", "FILE_COUNT", "CONTENT_SIZE", "PATHNAME"}

allHeaderFields = append(quotaHeaderFields, summaryHeaderFields...)

showQuotasFormat = "%v \t%v \t%v \t%v \t%v \t%v \t%v \t%s\n"
summaryFormat = "%v \t%v \t%v \t%s\n"

quotaNone = "none"
quotaInf = "inf"
)

func count(args []string, showQuotas, humanReadable bool) {
if len(args) == 0 {
fatalWithUsage()
}

paths, client, err := getClientAndExpandedPaths(args)

if err != nil {
fatal(err)
}

if len(paths) == 0 {
fatalWithUsage()
}

tw := tabwriter.NewWriter(os.Stdout, 8, 8, 0, ' ', 0)
var headerStr string
if showQuotas {
headerStr = joinHeaders(allHeaderFields)

} else {
headerStr = joinHeaders(summaryHeaderFields)
}
defer tw.Flush()

fmt.Fprintf(tw, headerStr)

for _, p := range paths {

var (
size, spaceQuota, remSpaceQuota int64
dirCount, fileCount, nameQuota, remNameQuota int
quotaStr = quotaNone
quotaRemStr = quotaInf
spaceQuotaStr = quotaNone
spaceQuotaRemStr = quotaInf
)

cs, err := client.GetContentSummary(p)
if err != nil {
fmt.Fprintln(tw, err)
status = 1
continue
}

qu, err := client.GetQuotaUsage(p)
if err != nil {
fmt.Fprintln(tw, err)
status = 1
continue
}

size = cs.Size()

dirCount = cs.DirectoryCount()
fileCount = cs.FileCount()
nameQuota = cs.NameQuota()
spaceQuota = cs.SpaceQuota()

remNameQuota = nameQuota - int(qu.FileAndDirectoryCount())

remSpaceQuota = spaceQuota - qu.SpaceConsumed()

if nameQuota > 0 {
quotaStr = formatSize(uint64(nameQuota), humanReadable)
quotaRemStr = formatSize(uint64(remNameQuota), humanReadable)
}

if spaceQuota >= 0 {
spaceQuotaStr = formatSize(uint64(spaceQuota), humanReadable)
spaceQuotaRemStr = formatSize(uint64(remSpaceQuota), humanReadable)
}

sizeStr := formatSize(uint64(size), humanReadable)

if showQuotas {

fmt.Fprintf(tw, showQuotasFormat,
quotaStr,
quotaRemStr,
spaceQuotaStr,
spaceQuotaRemStr,
dirCount,
fileCount,
sizeStr,
p,
)

} else {

fmt.Fprintf(tw, summaryFormat,
dirCount,
fileCount,
sizeStr,
p,
)

}
}
}
8 changes: 8 additions & 0 deletions cmd/hdfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Valid commands:
put SOURCE DEST
df [-h]
truncate SIZE FILE
count [-q] [-h] <path> ...
`, os.Args[0])

lsOpts = getopt.New()
Expand Down Expand Up @@ -83,6 +84,10 @@ Valid commands:
dus = duOpts.Bool('s')
duh = duOpts.Bool('h')

countOpts = getopt.New()
countq = countOpts.Bool('q')
counth = countOpts.Bool('h')

getmergeOpts = getopt.New()
getmergen = getmergeOpts.Bool('n')

Expand Down Expand Up @@ -146,6 +151,9 @@ func main() {
case "du":
duOpts.Parse(argv)
du(duOpts.Args(), *dus, *duh)
case "count":
countOpts.Parse(argv)
count(countOpts.Args(), *countq, *counth)
case "checksum":
checksum(argv[1:])
case "get":
Expand Down
25 changes: 21 additions & 4 deletions cmd/hdfs/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,36 @@ package main

import (
"fmt"
"strconv"
)

func formatBytes(i uint64) string {
switch {
case i > (1024 * 1024 * 1024 * 1024):
case i >= (1024 * 1024 * 1024 * 1024):
return fmt.Sprintf("%#.1fT", float64(i)/1024/1024/1024/1024)
case i > (1024 * 1024 * 1024):
case i >= (1024 * 1024 * 1024):
return fmt.Sprintf("%#.1fG", float64(i)/1024/1024/1024)
case i > (1024 * 1024):
case i >= (1024 * 1024):
return fmt.Sprintf("%#.1fM", float64(i)/1024/1024)
case i > 1024:
case i >= 1024:
return fmt.Sprintf("%#.1fK", float64(i)/1024)
default:
return fmt.Sprintf("%dB", i)
}
}

func formatSize(size uint64, humanReadable bool) string {
if humanReadable {
return formatBytes(size)
} else {
return strconv.FormatUint(size, 10)
}
}

func joinHeaders(headers []string) string {
result := ""
for _, header := range headers {
result += fmt.Sprintf("%s \t", header)
}
return result + "\n"
}
3 changes: 3 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
fileAlreadyExistsException = "org.apache.hadoop.fs.FileAlreadyExistsException"
alreadyBeingCreatedException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException"
illegalArgumentException = "org.apache.hadoop.HadoopIllegalArgumentException"
nullPointerException = "java.lang.NullPointerException"
)

// Error represents a remote java exception from an HDFS namenode or datanode.
Expand Down Expand Up @@ -53,6 +54,8 @@ func interpretException(err error) error {
return os.ErrExist
case illegalArgumentException:
return os.ErrInvalid
case nullPointerException:
return os.ErrNotExist
default:
return err
}
Expand Down
70 changes: 70 additions & 0 deletions quota_usage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package hdfs

import (
"os"

hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"google.golang.org/protobuf/proto"
)

// TODO: getTypesQuotaUsage
// https://github.com/apache/hadoop/blob/daafc8a0b849ffdf851c6a618684656925f1df76/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/QuotaUsage.java#L348C20-L348C38

// QuotaUsage represents quota usage about a file or directory in
// HDFS. It's provided directly by the namenode, and has no unix filesystem
// analogue.
type QuotaUsage struct {
name string
quotaUsage *hdfs.QuotaUsageProto
}

// GetQuotaUsage returns a QuotaUsage representing the named file or
// directory. The quota usage contains information about the entire tree rooted
// in the named file
func (c *Client) GetQuotaUsage(name string) (*QuotaUsage, error) {
qu, err := c.getQuotaUsage(name)
if err != nil {
err = &os.PathError{
Op: "quota usage",
Path: name,
Err: interpretException(err)}
}

return qu, err
}

func (c *Client) getQuotaUsage(name string) (*QuotaUsage, error) {
req := &hdfs.GetQuotaUsageRequestProto{Path: proto.String(name)}
resp := &hdfs.GetQuotaUsageResponseProto{}

err := c.namenode.Execute("getQuotaUsage", req, resp)
if err != nil {
return nil, err
}

return &QuotaUsage{name, resp.GetUsage()}, nil
}

// FileAndDirectoryCount returns the total file count of the named path, including any subdirectories.
func (qu *QuotaUsage) FileAndDirectoryCount() int64 {
return int64(qu.quotaUsage.GetFileAndDirectoryCount())
}

// NameQuota returns the HDFS configured "name quota" for the named path. The
// name quota is a hard limit on the number of directories and files inside a
// directory; see http://goo.gl/sOSJmJ for more information.
func (qu *QuotaUsage) Quota() int64 {
return int64(qu.quotaUsage.GetQuota())
}

// SpaceQuota returns the HDFS configured "space quota" for the named path.
// The space quota is a hard limit on the number of bytes used by files in the tree rooted at that directory.
// see http://goo.gl/sOSJmJ for more information.
func (qu *QuotaUsage) SpaceQuota() int64 {
return int64(qu.quotaUsage.GetSpaceQuota())
}

// SpaceConsumed returns the actual space consumed for the named path in HDFS.
func (qu *QuotaUsage) SpaceConsumed() int64 {
return int64(qu.quotaUsage.GetSpaceConsumed())
}
58 changes: 58 additions & 0 deletions quota_usage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package hdfs

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestQuotaUsageDir(t *testing.T) {
client := getClient(t)

baleet(t, "/_test/dirforcs")
mkdirp(t, "/_test/dirforcs/1")
mkdirp(t, "/_test/dirforcs/2")
touch(t, "/_test/dirforcs/foo")
touch(t, "/_test/dirforcs/1/bar")

resp, err := client.GetQuotaUsage("/_test/dirforcs")
require.NoError(t, err)

assert.EqualValues(t, 5, resp.FileAndDirectoryCount())
assert.True(t, resp.Quota() < 0)
assert.True(t, resp.SpaceQuota() < 0)
assert.True(t, resp.SpaceConsumed() == 0)

}

func TestQuotaUsageFile(t *testing.T) {
client := getClient(t)

resp, err := client.GetQuotaUsage("/_test/foo.txt")
require.NoError(t, err)

assert.EqualValues(t, 1, resp.FileAndDirectoryCount())
assert.True(t, resp.Quota() < 0)
assert.True(t, resp.SpaceQuota() < 0)
assert.True(t, resp.SpaceConsumed() > 0)
}

func TestQuotaUsageNonExistent(t *testing.T) {
client := getClient(t)

resp, err := client.GetQuotaUsage("/_test/nonexistent")
assertPathError(t, err, "quota usage", "/_test/nonexistent", os.ErrNotExist)
assert.Nil(t, resp)
}

func TestQuotaUsageDirWithoutPermission(t *testing.T) {
client2 := getClientForUser(t, "gohdfs2")

mkdirpMask(t, "/_test/accessdenied", 0700)
touchMask(t, "/_test/accessdenied/foo", 0600)

_, err := client2.GetQuotaUsage("/_test/accessdenied/foo")
assertPathError(t, err, "quota usage", "/_test/accessdenied/foo", os.ErrPermission)
}