4
4
5
5
import datetime
6
6
import os
7
+ import json
7
8
from tempfile import gettempdir
8
9
import streamsx .spl .op
9
10
import streamsx .spl .types
10
11
from streamsx .topology .schema import CommonSchema , StreamSchema
11
12
from streamsx .spl .types import rstring
12
13
from urllib .parse import urlparse
14
+ from streamsx .toolkits import download_toolkit
13
15
16
+ _TOOLKIT_NAME = 'com.ibm.streamsx.hdfs'
14
17
15
18
FileInfoSchema = StreamSchema ('tuple<rstring fileName, uint64 fileSize>' )
16
19
"""Structured schema of the file write response tuple. This schema is the output schema of the write method.
17
20
18
21
``'tuple<rstring fileName, uint64 fileSize>'``
19
22
"""
20
23
24
+
25
+ def _add_toolkit_dependency (topo , version ):
26
+ # IMPORTANT: Dependency of this python wrapper to a specific toolkit version
27
+ # This is important when toolkit is not set with streamsx.spl.toolkit.add_toolkit (selecting toolkit from remote build service)
28
+ streamsx .spl .toolkit .add_toolkit_dependency (topo , _TOOLKIT_NAME , version )
29
+
30
+
21
31
def _read_ae_service_credentials (credentials ):
22
32
hdfs_uri = ""
23
33
user = ""
@@ -29,7 +39,12 @@ def _read_ae_service_credentials(credentials):
29
39
password = credentials .get ('cluster' ).get ('password' )
30
40
hdfs_uri = credentials .get ('cluster' ).get ('service_endpoints' ).get ('webhdfs' )
31
41
else :
32
- raise ValueError (credentials )
42
+ if 'webhdfs' in credentials :
43
+ user = credentials .get ('user' )
44
+ password = credentials .get ('password' )
45
+ hdfs_uri = credentials .get ('webhdfs' )
46
+ else :
47
+ raise ValueError (credentials )
33
48
else :
34
49
raise TypeError (credentials )
35
50
# construct expected format for hdfs_uri: webhdfs://host:port
@@ -48,6 +63,88 @@ def _check_time_param(time_value, parameter_name):
48
63
raise ValueError ("Invalid " + parameter_name + " value. Value must be at least one second." )
49
64
return result
50
65
66
+ def configure_connection (instance , name = 'hdfs' , credentials = None ):
67
+ """Configures IBM Streams for a certain connection.
68
+
69
+
70
+ Creates or updates an application configuration object containing the required properties with connection information.
71
+
72
+
73
+ Example for creating a configuration for a Streams instance with connection details::
74
+
75
+ from streamsx.rest import Instance
76
+ import streamsx.topology.context
77
+ from icpd_core import icpd_util
78
+ import streamsx.hdfs as hdfs
79
+
80
+ cfg = icpd_util.get_service_instance_details (name='your-streams-instance')
81
+ cfg[context.ConfigParams.SSL_VERIFY] = False
82
+ instance = Instance.of_service (cfg)
83
+ app_cfg = hdfs.configure_connection (instance, credentials = 'my_credentials_json')
84
+
85
+ Args:
86
+ instance(streamsx.rest_primitives.Instance): IBM Streams instance object.
87
+ name(str): Name of the application configuration, default name is 'hdfs'.
88
+ credentials(str|dict): The service credentials, for example Analytics Engine service credentials.
89
+ Returns:
90
+ Name of the application configuration.
91
+ """
92
+
93
+ description = 'HDFS credentials'
94
+ properties = {}
95
+ if credentials is None :
96
+ raise TypeError (credentials )
97
+
98
+ if isinstance (credentials , dict ):
99
+ properties ['credentials' ] = json .dumps (credentials )
100
+ else :
101
+ properties ['credentials' ] = credentials
102
+
103
+ # check if application configuration exists
104
+ app_config = instance .get_application_configurations (name = name )
105
+ if app_config :
106
+ print ('update application configuration: ' + name )
107
+ app_config [0 ].update (properties )
108
+ else :
109
+ print ('create application configuration: ' + name )
110
+ instance .create_application_configuration (name , properties , description )
111
+ return name
112
+
113
+
114
+ def download_toolkit (url = None , target_dir = None ):
115
+ r"""Downloads the latest HDFS toolkit from GitHub.
116
+
117
+ Example for updating the HDFS toolkit for your topology with the latest toolkit from GitHub::
118
+
119
+ import streamsx.hdfs as hdfs
120
+ # download HDFS toolkit from GitHub
121
+ hdfs_toolkit_location = hdfs.download_toolkit()
122
+ # add the toolkit to topology
123
+ streamsx.spl.toolkit.add_toolkit(topology, hdfs_toolkit_location)
124
+
125
+ Example for updating the topology with a specific version of the HDFS toolkit using a URL::
126
+
127
+ import streamsx.hdfs as hdfs
128
+ url500 = 'https://github.com/IBMStreams/streamsx.hdfs/releases/download/v5.0.0/streamx.hdfs.toolkits-5.0.0-20190902-1637.tgz'
129
+ hdfs_toolkit_location = hdfs.download_toolkit(url=url500)
130
+ streamsx.spl.toolkit.add_toolkit(topology, hdfs_toolkit_location)
131
+
132
+ Args:
133
+ url(str): Link to toolkit archive (\*.tgz) to be downloaded. Use this parameter to
134
+ download a specific version of the toolkit.
135
+ target_dir(str): the directory where the toolkit is unpacked to. If a relative path is given,
136
+ the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems.
137
+ If target_dir is ``None`` a location relative to the system temporary directory is chosen.
138
+
139
+ Returns:
140
+ str: the location of the downloaded HDFS toolkit
141
+
142
+ .. note:: This function requires an outgoing Internet connection
143
+ .. versionadded:: 1.1
144
+ """
145
+ _toolkit_location = streamsx .toolkits .download_toolkit (toolkit_name = _TOOLKIT_NAME , url = url , target_dir = target_dir )
146
+ return _toolkit_location
147
+
51
148
52
149
def scan (topology , credentials , directory , pattern = None , init_delay = None , schema = CommonSchema .String , name = None ):
53
150
"""Scans a Hadoop Distributed File System directory for new or modified files.
0 commit comments