Skip to content

Commit

Permalink
Add support for handling multiple RMs sans Hadoop config
Browse files Browse the repository at this point in the history
Also added method `get_active_host_port()` to return the address,port
tuple currently in use for the ResourceManager instance.
  • Loading branch information
kevin-bates authored and lresende committed May 29, 2019
1 parent 075e4a8 commit 4c0612d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 11 deletions.
10 changes: 5 additions & 5 deletions tests/test_hadoop_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_get_resource_host_port(self):

@mock.patch('yarn_api_client.hadoop_conf._get_rm_ids')
@mock.patch('yarn_api_client.hadoop_conf.parse')
@mock.patch('yarn_api_client.hadoop_conf._check_is_active_rm')
@mock.patch('yarn_api_client.hadoop_conf.check_is_active_rm')
def test_get_resource_host_port_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock):
get_rm_ids_mock.return_value = ['rm1', 'rm2']
parse_mock.return_value = 'example.com:8022'
Expand Down Expand Up @@ -116,17 +116,17 @@ def getheader(self, header_key, default_return):

http_conn_request_mock.return_value = None
http_getresponse_mock.return_value = ResponseMock(OK, {})
self.assertTrue(hadoop_conf._check_is_active_rm('example2', '8022'))
self.assertTrue(hadoop_conf.check_is_active_rm('example2', '8022'))
http_getresponse_mock.reset_mock()
http_getresponse_mock.return_value = ResponseMock(OK, {'Refresh': "testing"})
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022'))
http_getresponse_mock.reset_mock()
http_getresponse_mock.return_value = ResponseMock(NOT_FOUND, {'Refresh': "testing"})
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022'))
http_conn_request_mock.side_effect = Exception('error')
http_conn_request_mock.reset_mock()
http_conn_request_mock.return_value = None
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022'))
pass

def test_get_resource_manager(self):
Expand Down
4 changes: 2 additions & 2 deletions yarn_api_client/hadoop_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _get_resource_manager(hadoop_conf_path, rm_id=None):
return None


def _check_is_active_rm(rm_web_host, rm_web_port):
def check_is_active_rm(rm_web_host, rm_web_port):
conn = HTTPConnection(rm_web_host, rm_web_port)
try:
conn.request('GET', '/cluster')
Expand All @@ -52,7 +52,7 @@ def get_resource_manager_host_port():
ret = _get_resource_manager(hadoop_conf_path, rm_id)
if ret is not None:
(host, port) = ret
if _check_is_active_rm(host, port):
if check_is_active_rm(host, port):
return host, port
return None
else:
Expand Down
26 changes: 22 additions & 4 deletions yarn_api_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .base import BaseYarnAPI
from .constants import YarnApplicationState, FinalApplicationStatus
from .errors import IllegalArgumentError
from .hadoop_conf import get_resource_manager_host_port
from .hadoop_conf import get_resource_manager_host_port, check_is_active_rm, CONF_DIR


class ResourceManager(BaseYarnAPI):
Expand All @@ -14,20 +14,38 @@ class ResourceManager(BaseYarnAPI):
and information about applications on the cluster.
If `address` argument is `None` client will try to extract `address` and
`port` from Hadoop configuration files.
`port` from Hadoop configuration files. If both `address` and `alt_address`
are provided, the address corresponding to the ACTIVE HA Resource Manager will
be used.
:param str address: ResourceManager HTTP address
:param int port: ResourceManager HTTP port
:param str alt_address: Alternate ResourceManager HTTP address for HA configurations
:param int alt_port: Alternate ResourceManager HTTP port for HA configurations
:param int timeout: API connection timeout in seconds
:param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN
"""
def __init__(self, address=None, port=8088, timeout=30, kerberos_enabled=False):
def __init__(self, address=None, port=8088, alt_address=None, alt_port=8088, timeout=30, kerberos_enabled=False):
if address is None:
self.logger.debug('Get configuration from hadoop conf dir')
self.logger.debug('Get configuration from hadoop conf dir: {conf_dir}'.format(conf_dir=CONF_DIR))
address, port = get_resource_manager_host_port()
else:
if alt_address: # Determine active RM
if not check_is_active_rm(address, port):
# Default is not active, check alternate
if check_is_active_rm(alt_address, alt_port):
address, port = alt_address, alt_port

super(ResourceManager, self).__init__(address, port, timeout, kerberos_enabled)

def get_active_host_port(self):
"""
The active address, port tuple to which this instance is associated.
:return: Tuple (str, int) corresponding to the active address and port
"""
return self.address, self.port

def cluster_information(self):
"""
The cluster information resource provides overall information about
Expand Down

0 comments on commit 4c0612d

Please sign in to comment.