Skip to content

Commit 075e4a8

Browse files
dimon222lresende
authored andcommitted
Code cleanup/refactoring (#31)
This is cleanup based on flake8 and pylint recommendations + corrections to make implementation more generic (for future extensibility of support POST/PUT/DELETE calls) Update: also added new-application API to fix #30
1 parent d245bd4 commit 075e4a8

13 files changed

+237
-67
lines changed

tests/test_base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def test_valid_request(self):
2424
requests_get_mock.get('/ololo', text=json.dumps(BaseYarnAPITestCase.success_response()))
2525

2626
client = self.get_client()
27-
response = client.request('/ololo', foo='bar')
27+
response = client.request('/ololo', params={"foo": 'bar'})
2828

2929
assert requests_get_mock.called
3030
self.assertIn(response.data['status'], 'success')
@@ -34,7 +34,7 @@ def test_valid_request_with_parameters(self):
3434
requests_get_mock.get('/ololo?foo=bar', text=json.dumps(BaseYarnAPITestCase.success_response()))
3535

3636
client = self.get_client()
37-
response = client.request('/ololo', foo='bar')
37+
response = client.request('/ololo', params={"foo": 'bar'})
3838

3939
assert requests_get_mock.called
4040
self.assertIn(response.data['status'], 'success')

tests/test_hadoop_conf.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,24 @@ def test_get_jobhistory_host_port(self):
165165
host_port = hadoop_conf.get_jobhistory_host_port()
166166
self.assertIsNone(host_port)
167167

168+
169+
def test_get_nodemanager_host_port(self):
170+
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
171+
parse_mock.return_value = 'example.com:8022'
172+
173+
host_port = hadoop_conf.get_nodemanager_host_port()
174+
175+
self.assertEqual(('example.com', '8022'), host_port)
176+
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
177+
'yarn.nodemanager.webapp.address')
178+
179+
parse_mock.reset_mock()
180+
parse_mock.return_value = None
181+
182+
host_port = hadoop_conf.get_nodemanager_host_port()
183+
self.assertIsNone(host_port)
184+
185+
168186
def test_get_webproxy_host_port(self):
169187
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
170188
parse_mock.return_value = 'example.com:8022'

tests/test_history_server.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,21 @@ def test_application_information(self, request_mock):
2323

2424
def test_jobs(self, request_mock):
2525
self.hs.jobs()
26-
request_mock.assert_called_with('/ws/v1/history/mapreduce/jobs')
26+
request_mock.assert_called_with('/ws/v1/history/mapreduce/jobs', params={})
2727

2828
self.hs.jobs(state='NEW', user='root', queue='high', limit=100,
2929
started_time_begin=1, started_time_end=2,
3030
finished_time_begin=3, finished_time_end=4)
3131

3232
request_mock.assert_called_with('/ws/v1/history/mapreduce/jobs',
33-
queue='high',
34-
state='NEW', user='root', limit=100,
35-
startedTimeBegin=1, startedTimeEnd=2,
36-
finishedTimeBegin=3, finishedTimeEnd=4)
33+
params={"queue": 'high',
34+
"state": 'NEW',
35+
"user": 'root',
36+
"limit": 100,
37+
"startedTimeBegin": 1,
38+
"startedTimeEnd": 2,
39+
"finishedTimeBegin": 3,
40+
"finishedTimeEnd": 4})
3741

3842
with self.assertRaises(IllegalArgumentError):
3943
self.hs.jobs(state='ololo')
@@ -56,12 +60,12 @@ def test_job_conf(self, request_mock):
5660

5761
def test_job_tasks(self, request_mock):
5862
self.hs.job_tasks('job_2')
59-
request_mock.assert_called_with('/ws/v1/history/mapreduce/jobs/job_2/tasks')
60-
self.hs.job_tasks('job_2', type='m')
61-
request_mock.assert_called_with('/ws/v1/history/mapreduce/jobs/job_2/tasks', type='m')
63+
request_mock.assert_called_with('/ws/v1/history/mapreduce/jobs/job_2/tasks', params={})
64+
self.hs.job_tasks('job_2', job_type='m')
65+
request_mock.assert_called_with('/ws/v1/history/mapreduce/jobs/job_2/tasks', params={"type": 'm'})
6266

6367
with self.assertRaises(IllegalArgumentError):
64-
self.hs.job_tasks('job_2', type='ololo')
68+
self.hs.job_tasks('job_2', job_type='ololo')
6569

6670
def test_job_task(self, request_mock):
6771
self.hs.job_task('job_2', 'task_3')

tests/test_node_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ def test_node_information(self, request_mock):
1818
def test_node_applications(self, request_mock):
1919
self.nm.node_applications('RUNNING', 'root')
2020
request_mock.assert_called_with('/ws/v1/node/apps',
21-
state='RUNNING', user='root')
21+
params={"state":'RUNNING', "user":'root'})
2222

2323
self.nm.node_applications()
24-
request_mock.assert_called_with('/ws/v1/node/apps')
24+
request_mock.assert_called_with('/ws/v1/node/apps', params={})
2525

2626
with self.assertRaises(IllegalArgumentError):
2727
self.nm.node_applications('ololo', 'root')

tests/test_resource_manager.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,40 @@ def test_cluster_application_attempts(self, request_mock):
6464

6565
def test_cluster_nodes(self, request_mock):
6666
self.rm.cluster_nodes()
67-
request_mock.assert_called_with('/ws/v1/cluster/nodes')
67+
request_mock.assert_called_with('/ws/v1/cluster/nodes', params={})
6868

6969
self.rm.cluster_nodes(state='NEW', healthy='true')
7070
request_mock.assert_called_with('/ws/v1/cluster/nodes',
71-
state='NEW', healthy='true')
71+
params={"state": 'NEW', "healthy": 'true'})
7272

7373
with self.assertRaises(IllegalArgumentError):
7474
self.rm.cluster_nodes(state='NEW', healthy='ololo')
7575

7676
def test_cluster_node(self, request_mock):
7777
self.rm.cluster_node('node_1')
7878
request_mock.assert_called_with('/ws/v1/cluster/nodes/node_1')
79+
80+
# TODO
81+
# def test_cluster_submit_application(self, request_mock):
82+
# self.rm.cluster_submit_application()
83+
# request_mock.assert_called_with('/ws/v1/cluster/apps')
84+
85+
def test_cluster_new_application(self, request_mock):
86+
self.rm.cluster_new_application()
87+
request_mock.assert_called_with('/ws/v1/cluster/apps/new-application', 'POST')
88+
89+
def test_cluster_get_application_queue(self, request_mock):
90+
self.rm.cluster_get_application_queue('app_1')
91+
request_mock.assert_called_with('/ws/v1/cluster/apps/app_1/queue')
92+
93+
def test_cluster_change_application_queue(self, request_mock):
94+
self.rm.cluster_change_application_queue('app_1', 'queue_1')
95+
request_mock.assert_called_with('/ws/v1/cluster/apps/app_1/queue', 'PUT', data={"queue": 'queue_1'})
96+
97+
def test_cluster_get_application_priority(self, request_mock):
98+
self.rm.cluster_get_application_priority('app_1')
99+
request_mock.assert_called_with('/ws/v1/cluster/apps/app_1/priority')
100+
101+
def test_cluster_change_application_priority(self, request_mock):
102+
self.rm.cluster_change_application_priority('app_1', 'priority_1')
103+
request_mock.assert_called_with('/ws/v1/cluster/apps/app_1/priority', 'PUT', data={"priority": 'priority_1'})

yarn_api_client/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# -*- coding: utf-8 -*-
22
__version__ = '0.3.4.dev0'
3-
__all__ = ['ApplicationMaster', 'HistoryServer', 'NodeManager','ResourceManager']
3+
__all__ = ['ApplicationMaster', 'HistoryServer', 'NodeManager', 'ResourceManager']
44

55
from .application_master import ApplicationMaster
66
from .history_server import HistoryServer

yarn_api_client/application_master.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ class ApplicationMaster(BaseYarnAPI):
2121
:param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN
2222
"""
2323
def __init__(self, address=None, port=8088, timeout=30, kerberos_enabled=False):
24-
self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled
2524
if address is None:
2625
self.logger.debug('Get configuration from hadoop conf dir')
2726
address, port = get_webproxy_host_port()
28-
self.address, self.port = address, port
27+
28+
super(ApplicationMaster, self).__init__(address, port, timeout, kerberos_enabled)
2929

3030
def application_information(self, application_id):
3131
"""

yarn_api_client/base.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# -*- coding: utf-8 -*-
22
from __future__ import unicode_literals
33

4-
import json
54
import logging
65
import requests
76

@@ -17,46 +16,33 @@ class BaseYarnAPI(object):
1716
__logger = None
1817
response_class = Response
1918

19+
def __init__(self, address=None, port=None, timeout=None, kerberos_enabled=None):
20+
self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled
21+
2022
def _validate_configuration(self):
2123
if self.address is None:
2224
raise ConfigurationError('API address is not set')
2325
elif self.port is None:
2426
raise ConfigurationError('API port is not set')
2527

26-
def request(self, api_path, **query_args):
27-
params = query_args
28+
def request(self, api_path, method='GET', **kwargs):
2829
api_endpoint = 'http://{}:{}{}'.format(self.address, self.port, api_path)
2930

3031
self.logger.info('API Endpoint {}'.format(api_endpoint))
3132

3233
self._validate_configuration()
3334

34-
response = None
35-
if self.kerberos_enabled:
36-
from requests_kerberos import HTTPKerberosAuth
37-
response = requests.get(api_endpoint, params, auth=HTTPKerberosAuth())
38-
else:
39-
response = requests.get(api_endpoint, params)
40-
41-
if response.status_code == requests.codes.ok:
42-
return self.response_class(response)
35+
if method == 'GET':
36+
headers = None
4337
else:
44-
msg = 'Response finished with status: %s. Details: %s' % (response.status_code, response.text)
45-
raise APIError(msg)
46-
47-
def update(self, api_path, data):
48-
api_endpoint = 'http://{}:{}{}'.format(self.address, self.port, api_path)
49-
50-
self.logger.info('API Endpoint {}'.format(api_endpoint))
51-
52-
self._validate_configuration()
38+
headers = {"Content-Type": "application/json"}
5339

5440
response = None
5541
if self.kerberos_enabled:
5642
from requests_kerberos import HTTPKerberosAuth
57-
response = requests.put(api_endpoint, data=data, auth=HTTPKerberosAuth(), headers={"Content-Type":"application/json"})
43+
response = requests.request(method=method, url=api_endpoint, auth=HTTPKerberosAuth(), headers=headers, **kwargs)
5844
else:
59-
response = requests.put(api_endpoint, data=data, headers={"Content-Type":"application/json"})
45+
response = requests.request(method=method, url=api_endpoint, headers=headers, **kwargs)
6046

6147
if response.status_code == requests.codes.ok:
6248
return self.response_class(response)

yarn_api_client/hadoop_conf.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def _get_rm_ids(hadoop_conf_path):
1616
return rm_ids
1717

1818

19-
def _get_resource_manager(hadoop_conf_path, rm_id = None):
19+
def _get_resource_manager(hadoop_conf_path, rm_id=None):
2020
prop_name = 'yarn.resourcemanager.webapp.address'
2121
if rm_id is not None:
2222
rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), '%s.%s' % (prop_name, rm_id))
@@ -70,6 +70,17 @@ def get_jobhistory_host_port():
7070
return None
7171

7272

73+
def get_nodemanager_host_port():
74+
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
75+
prop_name = 'yarn.nodemanager.webapp.address'
76+
value = parse(config_path, prop_name)
77+
if value is not None:
78+
host, _, port = value.partition(':')
79+
return host, port
80+
else:
81+
return None
82+
83+
7384
def get_webproxy_host_port():
7485
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
7586
prop_name = 'yarn.web-proxy.address'

yarn_api_client/history_server.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ class HistoryServer(BaseYarnAPI):
2121
:param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN
2222
"""
2323
def __init__(self, address=None, port=19888, timeout=30, kerberos_enabled=False):
24-
self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled
2524
if address is None:
2625
self.logger.debug('Get information from hadoop conf dir')
2726
address, port = get_jobhistory_host_port()
28-
self.address, self.port = address, port
27+
28+
super(HistoryServer, self).__init__(address, port, timeout, kerberos_enabled)
2929

3030
def application_information(self):
3131
"""
@@ -65,7 +65,7 @@ def jobs(self, state=None, user=None, queue=None, limit=None,
6565
"""
6666
path = '/ws/v1/history/mapreduce/jobs'
6767

68-
legal_states = set([s for s, _ in JobStateInternal])
68+
legal_states = {s for s, _ in JobStateInternal}
6969
if state is not None and state not in legal_states:
7070
msg = 'Job Internal State %s is illegal' % (state,)
7171
raise IllegalArgumentError(msg)
@@ -82,7 +82,7 @@ def jobs(self, state=None, user=None, queue=None, limit=None,
8282

8383
params = self.construct_parameters(loc_args)
8484

85-
return self.request(path, **params)
85+
return self.request(path, params=params)
8686

8787
def job(self, job_id):
8888
"""
@@ -134,7 +134,7 @@ def job_conf(self, job_id):
134134

135135
return self.request(path)
136136

137-
def job_tasks(self, job_id, type=None):
137+
def job_tasks(self, job_id, job_type=None):
138138
"""
139139
With the tasks API, you can obtain a collection of resources that
140140
represent a task within a job.
@@ -151,15 +151,15 @@ def job_tasks(self, job_id, type=None):
151151
# m - for map
152152
# r - for reduce
153153
valid_types = ['m', 'r']
154-
if type is not None and type not in valid_types:
155-
msg = 'Job type %s is illegal' % (type,)
154+
if job_type is not None and job_type not in valid_types:
155+
msg = 'Job type %s is illegal' % (job_type,)
156156
raise IllegalArgumentError(msg)
157157

158158
params = {}
159-
if type is not None:
160-
params['type'] = type
159+
if job_type is not None:
160+
params['type'] = job_type
161161

162-
return self.request(path, **params)
162+
return self.request(path, params=params)
163163

164164
def job_task(self, job_id, task_id):
165165
"""

0 commit comments

Comments
 (0)