Skip to content

Commit 886b4ef

Browse files
author
markheger
committed
1.1.0, download_toolkit and configure_connection added
1 parent 0b20d3e commit 886b4ef

File tree

5 files changed

+108
-14
lines changed

5 files changed

+108
-14
lines changed

package/docs/source/conf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@
6565
# built documents.
6666
#
6767
# The short X.Y version.
68-
version = '1.0'
68+
version = '1.1'
6969
# The full version, including alpha/beta/rc tags.
70-
release = '1.0.2'
70+
release = '1.1.0'
7171

7272
# The language for content autogenerated by Sphinx. Refer to documentation
7373
# for a list of supported languages.

package/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
'Programming Language :: Python :: 3.5',
2020
'Programming Language :: Python :: 3.6',
2121
],
22-
install_requires=['streamsx'],
22+
install_requires=['streamsx', 'streamsx.toolkits'],
2323

2424
test_suite='nose.collector',
2525
tests_require=['nose']

package/streamsx/hdfs/__init__.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,9 @@
2222
The mandatory JSON elements are "user", "password" and "webhdfs"::
2323
2424
{
25-
"cluster": {
26-
"password": "<PASSWORD>",
27-
"service_endpoints": {
28-
"webhdfs": "https://<HOST>:<PORT>/gateway/default/webhdfs/v1/"
29-
},
30-
"user": "<USER>"
31-
},
25+
"user": "<USER>"
26+
"password": "<PASSWORD>",
27+
"webhdfs": "https://<HOST>:<PORT>"
3228
}
3329
3430
If you are using HDFS server(s) different to the "Analytics Engine" service,
@@ -69,7 +65,7 @@
6965
7066
"""
7167

72-
__version__='1.0.2'
68+
__version__='1.1.0'
7369

74-
__all__ = ['scan', 'read', 'write']
75-
from streamsx.hdfs._hdfs import scan, read, write
70+
__all__ = ['download_toolkit', 'configure_connection', 'scan', 'read', 'write']
71+
from streamsx.hdfs._hdfs import download_toolkit, configure_connection, scan, read, write

package/streamsx/hdfs/_hdfs.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,30 @@
44

55
import datetime
66
import os
7+
import json
78
from tempfile import gettempdir
89
import streamsx.spl.op
910
import streamsx.spl.types
1011
from streamsx.topology.schema import CommonSchema, StreamSchema
1112
from streamsx.spl.types import rstring
1213
from urllib.parse import urlparse
14+
from streamsx.toolkits import download_toolkit
1315

16+
_TOOLKIT_NAME = 'com.ibm.streamsx.hdfs'
1417

1518
FileInfoSchema = StreamSchema('tuple<rstring fileName, uint64 fileSize>')
1619
"""Structured schema of the file write response tuple. This schema is the output schema of the write method.
1720
1821
``'tuple<rstring fileName, uint64 fileSize>'``
1922
"""
2023

24+
25+
def _add_toolkit_dependency(topo, version):
26+
# IMPORTANT: Dependency of this python wrapper to a specific toolkit version
27+
# This is important when toolkit is not set with streamsx.spl.toolkit.add_toolkit (selecting toolkit from remote build service)
28+
streamsx.spl.toolkit.add_toolkit_dependency(topo, _TOOLKIT_NAME, version)
29+
30+
2131
def _read_ae_service_credentials(credentials):
2232
hdfs_uri = ""
2333
user = ""
@@ -29,7 +39,12 @@ def _read_ae_service_credentials(credentials):
2939
password = credentials.get('cluster').get('password')
3040
hdfs_uri = credentials.get('cluster').get('service_endpoints').get('webhdfs')
3141
else:
32-
raise ValueError(credentials)
42+
if 'webhdfs' in credentials:
43+
user = credentials.get('user')
44+
password = credentials.get('password')
45+
hdfs_uri = credentials.get('webhdfs')
46+
else:
47+
raise ValueError(credentials)
3348
else:
3449
raise TypeError(credentials)
3550
# construct expected format for hdfs_uri: webhdfs://host:port
@@ -48,6 +63,88 @@ def _check_time_param(time_value, parameter_name):
4863
raise ValueError("Invalid "+parameter_name+" value. Value must be at least one second.")
4964
return result
5065

66+
def configure_connection (instance, name = 'hdfs', credentials = None):
67+
"""Configures IBM Streams for a certain connection.
68+
69+
70+
Creates or updates an application configuration object containing the required properties with connection information.
71+
72+
73+
Example for creating a configuration for a Streams instance with connection details::
74+
75+
from streamsx.rest import Instance
76+
import streamsx.topology.context
77+
from icpd_core import icpd_util
78+
import streamsx.hdfs as hdfs
79+
80+
cfg = icpd_util.get_service_instance_details (name='your-streams-instance')
81+
cfg[context.ConfigParams.SSL_VERIFY] = False
82+
instance = Instance.of_service (cfg)
83+
app_cfg = hdfs.configure_connection (instance, credentials = 'my_credentials_json')
84+
85+
Args:
86+
instance(streamsx.rest_primitives.Instance): IBM Streams instance object.
87+
name(str): Name of the application configuration, default name is 'hdfs'.
88+
credentials(str|dict): The service credentials, for example Analytics Engine service credentials.
89+
Returns:
90+
Name of the application configuration.
91+
"""
92+
93+
description = 'HDFS credentials'
94+
properties = {}
95+
if credentials is None:
96+
raise TypeError (credentials)
97+
98+
if isinstance (credentials, dict):
99+
properties ['credentials'] = json.dumps (credentials)
100+
else:
101+
properties ['credentials'] = credentials
102+
103+
# check if application configuration exists
104+
app_config = instance.get_application_configurations (name = name)
105+
if app_config:
106+
print ('update application configuration: ' + name)
107+
app_config[0].update (properties)
108+
else:
109+
print ('create application configuration: ' + name)
110+
instance.create_application_configuration (name, properties, description)
111+
return name
112+
113+
114+
def download_toolkit(url=None, target_dir=None):
115+
r"""Downloads the latest HDFS toolkit from GitHub.
116+
117+
    Example for updating the HDFS toolkit for your topology with the latest toolkit from GitHub::
118+
119+
import streamsx.hdfs as hdfs
120+
# download HDFS toolkit from GitHub
121+
hdfs_toolkit_location = hdfs.download_toolkit()
122+
# add the toolkit to topology
123+
streamsx.spl.toolkit.add_toolkit(topology, hdfs_toolkit_location)
124+
125+
    Example for updating the topology with a specific version of the HDFS toolkit using a URL::
126+
127+
import streamsx.hdfs as hdfs
128+
url500 = 'https://github.com/IBMStreams/streamsx.hdfs/releases/download/v5.0.0/streamx.hdfs.toolkits-5.0.0-20190902-1637.tgz'
129+
hdfs_toolkit_location = hdfs.download_toolkit(url=url500)
130+
streamsx.spl.toolkit.add_toolkit(topology, hdfs_toolkit_location)
131+
132+
    Args:
133+
        url(str): Link to toolkit archive (\*.tgz) to be downloaded. Use this parameter to
134+
download a specific version of the toolkit.
135+
target_dir(str): the directory where the toolkit is unpacked to. If a relative path is given,
136+
the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems.
137+
If target_dir is ``None`` a location relative to the system temporary directory is chosen.
138+
139+
   Returns:
140+
        str: the location of the downloaded HDFS toolkit
141+
142+
.. note:: This function requires an outgoing Internet connection
143+
.. versionadded:: 1.1
144+
"""
145+
_toolkit_location = streamsx.toolkits.download_toolkit (toolkit_name=_TOOLKIT_NAME, url=url, target_dir=target_dir)
146+
return _toolkit_location
147+
51148

52149
def scan(topology, credentials, directory, pattern=None, init_delay=None, schema=CommonSchema.String, name=None):
53150
"""Scans a Hadoop Distributed File System directory for new or modified files.

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
streamsx
2+
streamsx.toolkits

0 commit comments

Comments
 (0)