forked from colinmarc/hdfs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
97 lines (80 loc) · 2.16 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package hdfs
import (
"io"
"io/ioutil"
"os"
"os/user"
hdfs "github.com/colinmarc/hdfs/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/rpc"
)
// A Client represents a connection to an HDFS cluster
type Client struct {
namenode *rpc.NamenodeConnection
defaults *hdfs.FsServerDefaultsProto
}
// Username returns the value of HADOOP_USER_NAME in the environment, or
// the current system user if it is not set.
func Username() (string, error) {
username := os.Getenv("HADOOP_USER_NAME")
if username != "" {
return username, nil
}
currentUser, err := user.Current()
if err != nil {
return "", err
}
return currentUser.Username, nil
}
// New returns a connected Client, or an error if it can't connect. The user
// will be the user the code is running under.
func New(address string) (*Client, error) {
username, err := Username()
if err != nil {
return nil, err
}
return NewForUser(address, username)
}
// NewForUser returns a connected Client with the user specified, or an error if
// it can't connect.
func NewForUser(address string, user string) (*Client, error) {
namenode, err := rpc.NewNamenodeConnection(address, user)
if err != nil {
return nil, err
}
return &Client{namenode: namenode}, nil
}
// ReadFile reads the file named by filename and returns the contents.
func (c *Client) ReadFile(filename string) ([]byte, error) {
f, err := c.Open(filename)
if err != nil {
return nil, err
}
return ioutil.ReadAll(f)
}
// CopyToLocal copies the HDFS file specified by src to the local file at dst.
// If dst already exists, it will be overwritten.
func (c *Client) CopyToLocal(src string, dst string) error {
remote, err := c.Open(src)
if err != nil {
return err
}
local, err := os.Create(dst)
if err != nil {
return err
}
_, err = io.Copy(local, remote)
return err
}
func (c *Client) fetchDefaults() (*hdfs.FsServerDefaultsProto, error) {
if c.defaults != nil {
return c.defaults, nil
}
req := &hdfs.GetServerDefaultsRequestProto{}
resp := &hdfs.GetServerDefaultsResponseProto{}
err := c.namenode.Execute("getServerDefaults", req, resp)
if err != nil {
return nil, err
}
c.defaults = resp.GetServerDefaults()
return c.defaults, nil
}