diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..da94914 --- /dev/null +++ b/.gitignore @@ -0,0 +1,74 @@ +#Binario +# +package/files/flink-bin-1.8.1/flink.tar.gz +# Directories # +/build/ +/bin/ +**/bin/ +target/ +# OS Files # +.DS_Store +*.class +# Package Files # +*.war +*.ear +*.db +###################### +# Windows +###################### +# Windows image file caches +Thumbs.db +# Folder config file +Desktop.ini +###################### +# OSX +###################### +.DS_Store +.svn +# Thumbnails +._* +# Files that might appear on external disk +.Spotlight-V100 +.Trashes +###################### +# Eclipse +###################### +.project +.metadata +.classpath +.settings/ +.loadpath +*.pydevproject +bin/** +tmp/** +tmp/**/* +*.tmp +*.bak +*.swp +*~.nib +local.properties +/src/main/resources/rebel.xml +# External tool builders +.externalToolBuilders/ +# Locally stored "Eclipse launch configurations" +*.launch +# CDT-specific +.cproject +.project +# PDT-specific +.buildpath +#liferay sdk +build-common-theme.xml +build-common-portlet.xml +create.bat +create.sh +merge.bat +/build.xml +readme.txt +*/META-INF/* +*build.properties* +###################### +# IntelliJ +###################### +.idea +*.iml diff --git a/README.md b/README.md new file mode 100644 index 0000000..e50057e --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +## Flink ambari service + +### Install flink service diff --git a/configuration/flink-env.xml b/configuration/flink-env.xml new file mode 100755 index 0000000..d29e42f --- /dev/null +++ b/configuration/flink-env.xml @@ -0,0 +1,88 @@ + + + + + + flink_user + Flink User + flink + USER + + + user + false + + + cluster-env + user_group + + + + + + + flink_log_dir + Flink Log directory + /var/log/flink + Flink log directory where the daemon writes + + directory + + + + + flink_pid_dir + Flink PID directory + /var/run/flink + + + directory + + + + + flink_principal_name + Flink principal name + KERBEROS_PRINCIPAL + flink/_HOST + + + + flink_keytab + Flink keytab path + /etc/security/keytabs/flink.service.keytab + + + + hadoop_conf_dir + Haddop conf path + /etc/hadoop/conf + Haddop conf path (where is located core-site etc..) + + + directory + + + + + has_metric_collector + false + If true, Flink report metrics to ambari collector, by + default false. + + boolean + + + + + diff --git a/configuration/flink-site.xml b/configuration/flink-site.xml new file mode 100755 index 0000000..1884fa3 --- /dev/null +++ b/configuration/flink-site.xml @@ -0,0 +1,238 @@ + + + + + + + taskmanager.tmp.dirs: /tmp + Flink task manager temporary directory + /tmp + A directory on the local filesystem used by Flink + taskmanager as + temporary directory. + + + directory + + + + + jobmanager.rpc.port + 6123 + The port Flink to start the Job Manager service + + + int + + + + + parallelism.default + 1 + The defautl parallelism for running jobs + + int + + + + + + + flink_numcontainers + 1 + Number of YARN container to allocate (=Number of Task + Managers) + + + int + + + + + + flink_numberoftaskslots + 1 + Number of task slots in each container + + int + + + + + + flink_appname + flinkapp-from-ambari + Flink application name + + + + + flink_queue + default + YARN queue to schedule Flink job on + + + + + flink_streaming + false + If true, Flink will be started in streaming mode: to be + used when only streaming jobs will be executed on Flink + + + boolean + + + + + + flink_jobmanager_memory + 768 + Memory for JobManager Container [in MB]. Must be at least + 768 + + + + + + flink_container_memory + 1024 + Memory per TaskManager Container [in MB] + + + + + + content + flink-conf.yaml template + This is the jinja template for flink-conf.yaml file + + + #============================================================================== + # Common + #============================================================================== + + jobmanager.rpc.address: localhost + + #jobmanager.rpc.port: 6123 + + jobmanager.heap.mb: 256 + + taskmanager.heap.mb: 512 + + taskmanager.numberOfTaskSlots: 1 + + #parallelism.default: 1 + + #============================================================================== + # Web Frontend + #============================================================================== + # The port under which the web-based runtime monitor listens. + # A + value of -1 deactivates the web server. + jobmanager.web.port: 8081 + + # The + port uder which the standalone web client + # (for job upload and + submit) listens. + + webclient.port: 8080 + + #============================================================================== + # Streaming state checkpointing + #============================================================================== + + # The backend that will be used to store operator state checkpoints + if + # checkpointing is enabled. + # + # Supported backends: jobmanager, + filesystem + + state.backend: jobmanager + + # Directory for storing + checkpoints in a flink supported filesystem + # Note: State backend must + be accessible from the JobManager, use + file:// + # only for local setups. + # + # state.backend.fs.checkpointdir: hdfs://checkpoints + + #============================================================================== + # Advanced + #============================================================================== + # The number of buffers for the network stack. + # + # + taskmanager.network.numberOfBuffers: 2048 + + # Directories for temporary + files. + # + # Add a delimited list for multiple directories, using the + system + directory + # delimiter (colon ':' on unix) or a comma, e.g.: + # + /data1/tmp:/data2/tmp:/data3/tmp + # + # Note: Each directory entry is read + from and written to by a + different I/O + # thread. You can include the + same directory multiple times in order + to create + # multiple I/O threads + against that directory. This is for example + relevant for + # + high-throughput RAIDs. + # + # If not specified, the system-specific Java + temporary directory + (java.io.tmpdir + # property) is taken. + # + # + taskmanager.tmp.dirs: /tmp + + # Path to the Hadoop configuration + directory. + # + # This configuration is used when writing into HDFS. + Unless specified + otherwise, + # HDFS file creation will use HDFS default + settings with respect to + block-size, + # replication factor, etc. + # + # You + can also directly specify the paths to hdfs-default.xml and + hdfs-site.xml + # via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'. + # + # fs.hdfs.hadoopconf: /path/to/hadoop/conf/ + env.java.home: /usr/jdk64/jdk1.8.0_77/jre + taskmanager.memory.fraction: 0.6 + env.java.opts: -XX:+UseG1GC + + + content + + + + diff --git a/kerberos.json b/kerberos.json new file mode 100755 index 0000000..b19855b --- /dev/null +++ b/kerberos.json @@ -0,0 +1,17 @@ +{ + "services": [ + { + "name": "FLINK", + "identities": [ + { + "name": "/smokeuser" + } + ], + "components": [ + { + "name": "FLINK_MASTER" + } + ] + } + ] +} diff --git a/metainfo.xml b/metainfo.xml new file mode 100755 index 0000000..bc2b154 --- /dev/null +++ b/metainfo.xml @@ -0,0 +1,61 @@ + + + + + 2.0 + + + FLINK + Flink + Apache Hadoop Flink Stream processing framework (ambari package powerd by zylk) + 1.8.1 + + + FLINK_MASTER + Flink + MASTER + 1 + true + + + PYTHON + 1200 + + + + flink + true + + + + + + flink-site + flink-env + + + + + \ No newline at end of file diff --git a/package/.hash b/package/.hash new file mode 100755 index 0000000..2a7fb4d --- /dev/null +++ b/package/.hash @@ -0,0 +1 @@ +080aef8be92cc5135a66af018c33b211c3d6f7a4 \ No newline at end of file diff --git a/package/archive.zip b/package/archive.zip new file mode 100755 index 0000000..51de140 Binary files /dev/null and b/package/archive.zip differ diff --git a/package/files/wordCount.jar b/package/files/wordCount.jar new file mode 100755 index 0000000..aed64be Binary files /dev/null and b/package/files/wordCount.jar differ diff --git a/package/scripts/flink.py b/package/scripts/flink.py new file mode 100755 index 0000000..9fff86e --- /dev/null +++ b/package/scripts/flink.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.core.exceptions import Fail +from resource_management.core.resources.service import ServiceConfig +from resource_management.core.resources.system import Directory, Execute, File, Link +from resource_management.core.source import Template, InlineTemplate +from resource_management.libraries.resources.template_config import TemplateConfig +from resource_management.libraries.functions.default import default +from resource_management.libraries.functions.format import format +from resource_management.libraries.script.script import Script +from resource_management.core.source import Template +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions import StackFeature +from flink_yaml_utils import yaml_config_template +from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl +from ambari_commons import OSConst +from resource_management.libraries.functions.generate_logfeeder_input_config import generate_logfeeder_input_config +from resource_management.libraries.functions.setup_atlas_hook import has_atlas_in_cluster, setup_atlas_hook, setup_atlas_jar_symlinks +from ambari_commons.constants import SERVICE + + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def flink(name=None): + import params + import os + + Directory(params.log_dir, + owner=params.flink_user, + group=params.user_group, + mode=0777, + create_parents = True, + cd_access="a", + ) + + Directory([params.pid_dir], + owner=params.flink_user, + group=params.user_group, + create_parents = True, + cd_access="a", + mode=0755, + ) + + + configurations = params.config['configurations']['flink-site'] + + File(format("{conf_dir}/flink.yaml"), + content=yaml_config_template(configurations), + owner=params.flink_user, + group=params.user_group + ) + + generate_logfeeder_input_config('flink', Template("input.config-flink.json.j2", extra_imports=[default])) + + if params.has_metric_collector: + File(format("{conf_dir}/flink-metrics2.properties"), + owner=params.flink_user, + group=params.user_group, + content=Template("flink-metrics2.properties.j2") + ) + + if params.security_enabled: + TemplateConfig(format("{conf_dir}/flink_jaas.conf"), + owner=params.flink_user, + mode=0644 + ) + if params.stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.stack_version_formatted): + TemplateConfig(format("{conf_dir}/client_jaas.conf"), + owner=params.flink_user, + mode=0644 + ) + else: + File( + format("{conf_dir}/flink_jaas.conf"), + action="delete" + ) + File( + format("{conf_dir}/client_jaas.conf"), + action="delete" + ) + + +def _find_real_user_min_uid(): + """ + Finds minimal real user UID + """ + with open('/etc/login.defs') as f: + for line in f: + if line.strip().startswith('UID_MIN') and len(line.split()) == 2 and line.split()[1].isdigit(): + return int(line.split()[1]) + raise Fail("Unable to find UID_MIN in file /etc/login.defs. Expecting format e.g.: 'UID_MIN 500'") diff --git a/package/scripts/flink_server.py b/package/scripts/flink_server.py new file mode 100755 index 0000000..e3ed9d9 --- /dev/null +++ b/package/scripts/flink_server.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys, os, pwd, grp, signal, time, glob, subprocess +from resource_management.libraries.functions import check_process_status +from resource_management.core.logger import Logger +from resource_management.libraries.script import Script +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import format +from resource_management.core.resources.system import Execute +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions import StackFeature +from resource_management.core.resources.system import Directory, Execute, File, Link +from flink import flink +from service import service +from service_check import ServiceCheck +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ + FILE_TYPE_JAAS_CONF + +class FlinkServer(Script): + + def install(self, env): + Logger.info('********************************') + Logger.info('* Installing Flink on the node *') + Logger.info('********************************') + import status_params + import params + env.set_params(status_params) + env.set_params(params) + Logger.info('*****status_params********************pid_dir: '+status_params.pid_dir) + Logger.info('*****params**********flink_component_home_dir: '+params.flink_component_home_dir) + Logger.info('*****params***************************log_dir: '+params.log_dir) + Logger.info('*****params************************flink_user: '+params.flink_user) + Logger.info('*****params************************user_group: '+params.user_group) + + service_packagedir = os.path.realpath(__file__).split('/scripts')[0] + Logger.info('***************************service_packagedir: '+service_packagedir) + + cmd = 'tar -xvzf '+service_packagedir+'/files/flink-bin-1.8.1/flink.tar.gz --strip-components=1 -C /opt/flink' + Logger.info('******************************************cmd: '+cmd) + Directory([status_params.pid_dir, params.log_dir, params.flink_component_home_dir], + owner=params.flink_user, + group=params.user_group + ) + Execute(cmd, user=params.flink_user) + self.configure(env) + + def configure(self, env): + import params + env.set_params(params) + flink() + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + env.set_params(params) + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + stack_select.select_packages(params.version) + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + self.configure(env) + service("flink", action="start") + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + service("flink", action="stop") + + def status(self, env): + import status_params + env.set_params(status_params) + check_process_status(status_params.pid_file) + + def get_log_folder(self): + import params + return params.log_dir + + def get_user(self): + import params + return params.flink_user + + def get_pid_files(self): + import status_params + return [status_params.pid_file] + +if __name__ == "__main__": + FlinkServer().execute() diff --git a/package/scripts/flink_upgrade.py b/package/scripts/flink_upgrade.py new file mode 100755 index 0000000..279eb88 --- /dev/null +++ b/package/scripts/flink_upgrade.py @@ -0,0 +1,49 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. +import os + +from ambari_commons import yaml_utils +from resource_management.core.logger import Logger +from resource_management.core.exceptions import Fail +from resource_management.core.resources.system import Directory +from resource_management.core.resources.system import File +from resource_management.core.resources.system import Execute +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.default import default +from resource_management.libraries.functions.format import format + +class FlinkUpgrade(Script): + """ + Applies to Rolling/Express Upgrade from HDP 2.1 or 2.2 to 2.3 or higher. + + Requirements: Needs to run from a host with ZooKeeper Client. + + This class helps perform some of the upgrade tasks needed for Flink during + a Rolling or Express upgrade. Flink writes data to disk locally and to ZooKeeper. + If any HDP 2.1 or 2.2 bits exist in these directories when an HDP 2.3 instance + starts up, it will fail to start properly. Because the upgrade framework in + Ambari doesn't yet have a mechanism to say "stop all" before starting to + upgrade each component, we need to rely on a Flink trick to bring down + running daemons. By removing the ZooKeeper data with running daemons, those + daemons will die. + """ + +if __name__ == "__main__": + FlinkUpgrade().execute() \ No newline at end of file diff --git a/package/scripts/flink_yaml_utils.py b/package/scripts/flink_yaml_utils.py new file mode 100755 index 0000000..8136511 --- /dev/null +++ b/package/scripts/flink_yaml_utils.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os +import resource_management + +from ambari_commons.yaml_utils import escape_yaml_property +from resource_management.core.source import InlineTemplate +from resource_management.core.resources.system import File + +def replace_jaas_placeholder(name, security_enabled, conf_dir): + if name.find('_JAAS_PLACEHOLDER') > -1: + if security_enabled: + if name.find('Nimbus_JVM') > -1: + return name.replace('_JAAS_PLACEHOLDER', '-Djava.security.auth.login.config=' + conf_dir + '/flink_jaas.conf -Djavax.security.auth.useSubjectCredsOnly=false') + else: + return name.replace('_JAAS_PLACEHOLDER', '-Djava.security.auth.login.config=' + conf_dir + '/flink_jaas.conf') + else: + return name.replace('_JAAS_PLACEHOLDER', '') + else: + return name + +flink_yaml_template = """{% for key, value in configurations|dictsort if not key.startswith('_') %}{{key}} : {{ escape_yaml_property(replace_jaas_placeholder(resource_management.core.source.InlineTemplate(value).get_content().strip(), security_enabled, conf_dir)) }} +{% endfor %}""" + +def yaml_config_template(configurations): + return InlineTemplate(flink_yaml_template, configurations=configurations, + extra_imports=[escape_yaml_property, replace_jaas_placeholder, resource_management, + resource_management.core, resource_management.core.source]) diff --git a/package/scripts/params.py b/package/scripts/params.py new file mode 100755 index 0000000..21ba689 --- /dev/null +++ b/package/scripts/params.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +from ambari_commons import OSCheck +from resource_management.libraries.functions.default import default + +#if OSCheck.is_windows_family(): +# from params_windows import * +#else: +# from params_linux import * +from params_linux import * + +retryAble = default("/commandParams/command_retry_enabled", False) diff --git a/package/scripts/params_linux.py b/package/scripts/params_linux.py new file mode 100755 index 0000000..da9a85c --- /dev/null +++ b/package/scripts/params_linux.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os +import re +import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. + +import status_params + +from ambari_commons.constants import AMBARI_SUDO_BINARY +from ambari_commons import yaml_utils +from resource_management.libraries.functions import format +from resource_management.libraries.functions.default import default +from resource_management.libraries.functions.get_bare_principal import get_bare_principal +from resource_management.libraries.script import Script +from resource_management.libraries.resources.hdfs_resource import HdfsResource +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import get_kinit_path +from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.stack_features import get_stack_feature_version +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.expect import expect +from resource_management.libraries.functions.setup_atlas_hook import has_atlas_in_cluster +from resource_management.libraries.functions import is_empty +from ambari_commons.ambari_metrics_helper import select_metric_collector_hosts_from_hostnames +from resource_management.libraries.functions.setup_ranger_plugin_xml import get_audit_configs, generate_ranger_service_config + +# server configurations +config = Script.get_config() +tmp_dir = Script.get_tmp_dir() +stack_root = status_params.stack_root +sudo = AMBARI_SUDO_BINARY + +limits_conf_dir = "/etc/security/limits.d" + +# Needed since this is an Atlas Hook service. +cluster_name = config['clusterName'] + +stack_name = status_params.stack_name +upgrade_direction = default("/commandParams/upgrade_direction", None) +version = default("/commandParams/version", None) + +agent_stack_retry_on_unavailability = config['ambariLevelParams']['agent_stack_retry_on_unavailability'] +agent_stack_retry_count = expect("/ambariLevelParams/agent_stack_retry_count", int) + +flink_component_home_dir = status_params.flink_component_home_dir +conf_dir = status_params.conf_dir + +stack_version_unformatted = status_params.stack_version_unformatted +stack_version_formatted = status_params.stack_version_formatted +# get the correct version to use for checking stack features +version_for_stack_feature_checks = get_stack_feature_version(config) + +flink_user = config['configurations']['flink-env']['flink_user'] +log_dir = config['configurations']['flink-env']['flink_log_dir'] +pid_dir = status_params.pid_dir +user_group = config['configurations']['cluster-env']['user_group'] +java64_home = config['ambariLevelParams']['java_home'] +jps_binary = format("{java64_home}/bin/jps") +security_enabled = config['configurations']['cluster-env']['security_enabled'] + + +if security_enabled: + _hostname_lowercase = config['agentLevelParams']['hostname'].lower() + _flink_principal_name = config['configurations']['flink-env']['flink_principal_name'] + flink_jaas_principal = _flink_principal_name.replace('_HOST',_hostname_lowercase) + _ambari_principal_name = default('/configurations/cluster-env/ambari_principal_name', None) + flink_keytab_path = config['configurations']['flink-env']['flink_keytab'] + + if _ambari_principal_name: + ambari_bare_jaas_principal = get_bare_principal(_ambari_principal_name) + +jdk_location = config['ambariLevelParams']['jdk_location'] +namenode_hosts = default("/clusterHostInfo/namenode_hosts", []) +has_namenode = not len(namenode_hosts) == 0 + +hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if has_namenode else None +hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] if has_namenode else None +hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode else None +hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None +default_fs = config['configurations']['core-site']['fs.defaultFS'] if has_namenode else None +hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None +hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None +kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) +dfs_type = default("/clusterLevelParams/dfs_type", "") + + +# params from flink-ambari-config +flink_install_dir = "/opt/flink" +flink_bin_dir = "/opt/flink/bin" +flink_numcontainers = config['configurations']['flink-site']['flink_numcontainers'] +flink_numberoftaskslots= config['configurations']['flink-site']['flink_numberoftaskslots'] +flink_jobmanager_memory = config['configurations']['flink-site']['flink_jobmanager_memory'] +flink_container_memory = config['configurations']['flink-site']['flink_container_memory'] +flink_appname = config['configurations']['flink-site']['flink_appname'] +flink_queue = config['configurations']['flink-site']['flink_queue'] +flink_streaming = config['configurations']['flink-site']['flink_streaming'] +hadoop_conf_dir = config['configurations']['flink-env']['hadoop_conf_dir'] +has_metric_collector = config['configurations']['flink-env']['has_metric_collector'] + + +import functools +#create partial functions with common arguments for every HdfsResource call +#to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code +#HdfsResource = functools.partial( +# HdfsResource, +# user=hdfs_user, +# hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore", +# security_enabled = security_enabled, +# keytab = hdfs_user_keytab, +# kinit_path_local = kinit_path_local, +# hadoop_bin_dir = hadoop_bin_dir, +# hadoop_conf_dir = hadoop_conf_dir, +# principal_name = hdfs_principal_name, +# hdfs_site = hdfs_site, +# default_fs = default_fs, +# immutable_paths = get_not_managed_resources(), +# dfs_type = dfs_type, +#) diff --git a/package/scripts/service.py b/package/scripts/service.py new file mode 100755 index 0000000..a194075 --- /dev/null +++ b/package/scripts/service.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +import os +import pwd +import grp +import signal +import time +import glob +import subprocess + +from resource_management.core.resources import Execute +from resource_management.core.resources import File +from resource_management.core.shell import as_user +from resource_management.core import shell +from resource_management.core.logger import Logger +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions import get_user_call_output +from resource_management.libraries.functions.show_logs import show_logs +import time + + +def service(name, action='start'): + import params + import status_params + + pid_file = status_params.pid_files[name] + no_op_test = as_user(format( + "ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1"), user=params.flink_user) + + flink_env = format( + "export PATH=$JAVA_HOME/bin:$PATH") + + if action == "start": + # get the hadoop classpath from command hadoop classpath, from the plugin https://github.com/abajwa-hw/ambari-flink-service/ + cmd_open = subprocess.Popen( + ["hadoop", "classpath"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hadoop_classpath = cmd_open.communicate()[0].strip() + cmd = format("export HADOOP_CONF_DIR={hadoop_conf_dir}; export HADOOP_CLASSPATH={hadoop_classpath}; {flink_bin_dir}/yarn-session.sh -n {flink_numcontainers} -s {flink_numberoftaskslots} -jm {flink_jobmanager_memory} -tm {flink_container_memory} -qu {flink_queue} -nm {flink_appname} -d") + cmd = format("{cmd} &\n echo $! > {pid_file}") + Execute(cmd, not_if=no_op_test, user=params.flink_user, + path=params.flink_bin_dir) + File(pid_file, owner = params.flink_user, group = params.user_group) + + elif action == "stop": + process_dont_exist = format("! ({no_op_test})") + if os.path.exists(pid_file): + pid = get_user_call_output.get_user_call_output(format("! test -f {pid_file} || cat {pid_file}"), user=params.flink_user)[1] + # if multiple processes are running (for example user can start logviewer from console) + # there can be more than one id + pid = pid.replace("\n", " ") + Execute(format("{sudo} kill {pid}"), not_if = process_dont_exist) + Execute(format("{sudo} kill -9 {pid}"), + not_if = format("sleep 2; {process_dont_exist} || sleep 20; {process_dont_exist}"),ignore_failures = True) + File(pid_file, action = "delete") diff --git a/package/scripts/service_check.py b/package/scripts/service_check.py new file mode 100755 index 0000000..66f4f60 --- /dev/null +++ b/package/scripts/service_check.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import os + +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions import get_unique_id_and_date +from resource_management.core.resources import File +from resource_management.core.resources import Execute +from resource_management.libraries.script import Script +from resource_management.core.source import StaticFile +from ambari_commons import OSCheck, OSConst +from ambari_commons.os_family_impl import OsFamilyImpl + +class ServiceCheck(Script): + pass + + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class ServiceCheckWindows(ServiceCheck): + def service_check(self, env): + import params + env.set_params(params) + smoke_cmd = os.path.join(params.stack_root,"Run-SmokeTests.cmd") + service = "FLINK" + Execute(format("cmd /C {smoke_cmd} {service}", smoke_cmd=smoke_cmd, service=service), user=params.flink_user, logoutput=True) + + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class ServiceCheckDefault(ServiceCheck): + def service_check(self, env): + import params + env.set_params(params) + + unique = get_unique_id_and_date() + + #File("/tmp/wordCount.jar", + # content=StaticFile("wordCount.jar"), + # owner=params.flink_user + #) + + cmd = "" + #cmd = format("flink jar /tmp/wordCount.jar flink.starter.WordCountTopology WordCount{unique} -c nimbus.host={nimbus_host}") + + #Execute(cmd, + # logoutput=True, + # path=params.flink_bin_dir, + # user=params.flink_user + #) + + #Execute(format("flink kill WordCount{unique}"), + # path=params.flink_bin_dir, + # user=params.flink_user + #) + +if __name__ == "__main__": + ServiceCheck().execute() diff --git a/package/scripts/status_params.py b/package/scripts/status_params.py new file mode 100755 index 0000000..80fbb48 --- /dev/null +++ b/package/scripts/status_params.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +from resource_management.libraries.script import Script +from resource_management.libraries.functions import get_kinit_path +from resource_management.libraries.functions import default, format +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions import StackFeature +from ambari_commons import OSCheck + +# a map of the Ambari role to the component name +# for use with /current/ +SERVER_ROLE_DIRECTORY_MAP = { + 'FLINK' : 'flink' +} + +#component_directory = Script.get_component_from_role(SERVER_ROLE_DIRECTORY_MAP, "FLINK_SERVICE") + +config = Script.get_config() +stack_root = Script.get_stack_root() +stack_version_unformatted = str(config['clusterLevelParams']['stack_version']) +stack_version_formatted = format_stack_version(stack_version_unformatted) + +pid_dir = config['configurations']['flink-env']['flink_pid_dir'] +pid_file = format("{pid_dir}/flink.pid") + +pid_files = { + "flink":pid_file +} + +# Security related/required params +hostname = config['agentLevelParams']['hostname'] +security_enabled = config['configurations']['cluster-env']['security_enabled'] +kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) +tmp_dir = Script.get_tmp_dir() + +flink_component_home_dir = "/opt/flink" +conf_dir = "/opt/flink/conf" + +flink_user = config['configurations']['flink-env']['flink_user'] +flink_ui_principal = default('/configurations/flink-env/flink_principal_name', None) +flink_ui_keytab = default('/configurations/flink-env/flink_keytab', None) + +stack_name = default("/clusterLevelParams/stack_name", None) \ No newline at end of file diff --git a/package/templates/client_jaas.conf.j2 b/package/templates/client_jaas.conf.j2 new file mode 100755 index 0000000..268982a --- /dev/null +++ b/package/templates/client_jaas.conf.j2 @@ -0,0 +1,24 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +FlinkClient { + com.sun.security.auth.module.Krb5LoginModule required + useTicketCache=true + renewTicket=true + serviceName="{{flink_jaas_principal}}"; +}; diff --git a/package/templates/flink-metrics2.properties.j2 b/package/templates/flink-metrics2.properties.j2 new file mode 100755 index 0000000..79d0b89 --- /dev/null +++ b/package/templates/flink-metrics2.properties.j2 @@ -0,0 +1,35 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +collector.hosts={{ams_collector_hosts}} +protocol={{metric_collector_protocol}} +port={{metric_collector_port}} +zookeeper.quorum={{zookeeper_quorum}} +maxRowCacheSize=10000 +sendInterval={{metrics_report_interval}}000 +clusterReporterAppId=nimbus +host_in_memory_aggregation = {{host_in_memory_aggregation}} +host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}} +{% if is_aggregation_https_enabled %} +host_in_memory_aggregation_protocol = {{host_in_memory_aggregation_protocol}} +{% endif %} + +# HTTPS properties +truststore.path = {{metric_truststore_path}} +truststore.type = {{metric_truststore_type}} +truststore.password = {{metric_truststore_password}} diff --git a/package/templates/flink_jaas.conf.j2 b/package/templates/flink_jaas.conf.j2 new file mode 100755 index 0000000..073b084 --- /dev/null +++ b/package/templates/flink_jaas.conf.j2 @@ -0,0 +1,45 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +FlinkServer { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="{{flink_keytab_path}}" + storeKey=true + useTicketCache=false + principal="{{flink_jaas_principal}}"; +}; +FlinkClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="{{flink_keytab_path}}" + storeKey=true + useTicketCache=false + serviceName="FlinkServer" + principal="{{flink_jaas_principal}}"; +}; +com.sun.security.jgss.krb5.initiate { + com.sun.security.auth.module.Krb5LoginModule required + renewTGT=false + doNotPrompt=true + useKeyTab=true + keyTab="{{flink_keytab_path}}" + principal="{{flink_jaas_principal}}" + storeKey=true + useTicketCache=false; +}; diff --git a/package/templates/input.config-flink.json.j2 b/package/templates/input.config-flink.json.j2 new file mode 100755 index 0000000..3f69a31 --- /dev/null +++ b/package/templates/input.config-flink.json.j2 @@ -0,0 +1,49 @@ +{# + # Licensed to the Apache Software Foundation (ASF) under one + # or more contributor license agreements. See the NOTICE file + # distributed with this work for additional information + # regarding copyright ownership. The ASF licenses this file + # to you under the Apache License, Version 2.0 (the + # "License"); you may not use this file except in compliance + # with the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + #} +{ + "input":[ + { + "type":"flink_server", + "rowtype":"service", + "path":"{{default('/configurations/flink-env/flink_log_dir', '/var/log/flink')}}/flink.log" + } + ], + "filter":[ + { + "filter":"grok", + "sort_order":1, + "conditions":{ + "fields":{ + "type":[ + "flink_server" + ] + } + }, + "log4j_format":"", + "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})", + "message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{JAVACLASS:logger_name}\\s%{GREEDYDATA:thread_name}\\s\\[%{LOGLEVEL:level}\\]\\s%{GREEDYDATA:log_message}", + "post_map_values":{ + "logtime":{ + "map_date":{ + "target_date_pattern":"yyyy-MM-dd HH:mm:ss.SSS" + } + } + } + } + ] +} diff --git a/quicklinks/quicklinks.json b/quicklinks/quicklinks.json new file mode 100755 index 0000000..d45f337 --- /dev/null +++ b/quicklinks/quicklinks.json @@ -0,0 +1,45 @@ +{ + "name": "default", + "description": "default quick links configuration", + "configuration": { + "protocol": + { + "type":"https", + "checks":[ + { + "property":"ui.https.keystore.path", + "desired":"EXIST", + "site":"storm-site" + }, + { + "property":"ui.https.key.password", + "desired":"EXIST", + "site":"storm-site" + }, + { + "property":"ui.https.port", + "desired":"EXIST", + "site":"storm-site" + } + ] + }, + + "links": [ + { + "name": "storm_ui", + "label": "Storm UI", + "requires_user_name": "false", + "component_name": "STORM_UI_SERVER", + "url":"%@://%@:%@/", + "port":{ + "http_property": "ui.port", + "http_default_port": "8744", + "https_property": "ui.https.port", + "https_default_port": "8740", + "regex": "^(\\d+)$", + "site": "storm-site" + } + } + ] + } +} diff --git a/role_command_order.json b/role_command_order.json new file mode 100755 index 0000000..7f2bb2e --- /dev/null +++ b/role_command_order.json @@ -0,0 +1,6 @@ +{ + "general_deps" : { + "_comment" : "dependencies for FLINK", + "FLINK-START" : ["YARN-START"] + } +} diff --git a/service_advisor.notpy b/service_advisor.notpy new file mode 100755 index 0000000..cf696c8 --- /dev/null +++ b/service_advisor.notpy @@ -0,0 +1,449 @@ +#!/usr/bin/env ambari-python-wrap +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Python imports +import imp +import os +import traceback +import re +import socket +import fnmatch + +from resource_management.libraries.functions.get_bare_principal import get_bare_principal +from resource_management.core.logger import Logger + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../../../stacks/') +PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py') + +try: + if "BASE_SERVICE_ADVISOR" in os.environ: + PARENT_FILE = os.environ["BASE_SERVICE_ADVISOR"] + with open(PARENT_FILE, 'rb') as fp: + service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE)) +except Exception as e: + traceback.print_exc() + print "Failed to load parent" + +class StormServiceAdvisor(service_advisor.ServiceAdvisor): + + def __init__(self, *args, **kwargs): + self.as_super = super(StormServiceAdvisor, self) + self.as_super.__init__(*args, **kwargs) + + # Always call these methods + self.modifyMastersWithMultipleInstances() + self.modifyCardinalitiesDict() + self.modifyHeapSizeProperties() + self.modifyNotValuableComponents() + self.modifyComponentsNotPreferableOnServer() + self.modifyComponentLayoutSchemes() + + def modifyMastersWithMultipleInstances(self): + """ + Modify the set of masters with multiple instances. + Must be overriden in child class. + """ + # Nothing to do + pass + + def modifyCardinalitiesDict(self): + """ + Modify the dictionary of cardinalities. + Must be overriden in child class. + """ + # Nothing to do + pass + + def modifyHeapSizeProperties(self): + """ + Modify the dictionary of heap size properties. + Must be overriden in child class. + """ + pass + + def modifyNotValuableComponents(self): + """ + Modify the set of components whose host assignment is based on other services. + Must be overriden in child class. + """ + # Nothing to do + pass + + def modifyComponentsNotPreferableOnServer(self): + """ + Modify the set of components that are not preferable on the server. + Must be overriden in child class. + """ + # Nothing to do + pass + + def modifyComponentLayoutSchemes(self): + """ + Modify layout scheme dictionaries for components. + The scheme dictionary basically maps the number of hosts to + host index where component should exist. + Must be overriden in child class. + """ + # Nothing to do + pass + + def getServiceComponentLayoutValidations(self, services, hosts): + """ + Get a list of errors. + Must be overriden in child class. + """ + + return self.getServiceComponentCardinalityValidations(services, hosts, "STORM") + + def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts): + """ + Entry point. + Must be overriden in child class. + """ + #Logger.info("Class: %s, Method: %s. Recommending Service Configurations." % + # (self.__class__.__name__, inspect.stack()[0][3])) + + recommender = StormRecommender() + recommender.recommendStormConfigurationsFromHDP206(configurations, clusterData, services, hosts) + recommender.recommendStormConfigurationsFromHDP21(configurations, clusterData, services, hosts) + recommender.recommendStormConfigurationsFromHDP22(configurations, clusterData, services, hosts) + recommender.recommendStormConfigurationsFromHDP23(configurations, clusterData, services, hosts) + recommender.recommendStormConfigurationsFromHDP26(configurations, clusterData, services, hosts) + recommender.recommendStormConfigurationsFromHDP30(configurations, clusterData, services, hosts) + + def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts): + """ + Entry point. + Validate configurations for the service. Return a list of errors. + The code for this function should be the same for each Service Advisor. + """ + #Logger.info("Class: %s, Method: %s. Validating Configurations." % + # (self.__class__.__name__, inspect.stack()[0][3])) + + validator = StormValidator() + # Calls the methods of the validator using arguments, + # method(siteProperties, siteRecommendations, configurations, services, hosts) + return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators) + + @staticmethod + def isKerberosEnabled(services, configurations): + """ + Determine if Kerberos is enabled for Storm. + + If storm-site/storm.thrift.transport exists and is set to kerberos sasl transport plugin, return True; + otherwise return false. + + The value of this property is first tested in the updated configurations (configurations) then + tested in the current configuration set (services) + + :type services: dict + :param services: the dictionary containing the existing configuration values + :type configurations: dict + :param configurations: the dictionary containing the updated configuration values + :rtype: bool + :return: True or False + """ + if configurations and "storm-site" in configurations and \ + "storm.thrift.transport" in configurations["storm-site"]["properties"]: + return configurations["storm-site"]["properties"]["storm.thrift.transport"] == "org.apache.storm.security.auth.kerberos.KerberosSaslTransportPlugin" + elif services and "storm-site" in services["configurations"] and \ + "storm.thrift.transport" in services["configurations"]["storm-site"]["properties"]: + return services["configurations"]["storm-site"]["properties"]["storm.thrift.transport"] == "org.apache.storm.security.auth.kerberos.KerberosSaslTransportPlugin" + else: + return False + + + +class StormRecommender(service_advisor.ServiceAdvisor): + """ + Storm Recommender suggests properties when adding the service for the first time or modifying configs via the UI. + """ + + def __init__(self, *args, **kwargs): + self.as_super = super(StormRecommender, self) + self.as_super.__init__(*args, **kwargs) + + def appendToYamlString(self, yaml_string, list_classes): + updated_yaml_string = "" + try: + strip_yaml_str = re.sub('[\[\]\']', ' ', yaml_string) + klass_array = [x.strip() for x in strip_yaml_str.split(',')] + if yaml_string: + for klass in list_classes: + klass = klass.strip() + klass_array.append(klass) + klass_set = set(klass_array) + klass_list = [("'" + e + "'") for e in klass_set] + updated_yaml_string = "[" + ",".join(klass_list) + "]" + except Exception: + klass_list = [("'" + e + "'") for e in list_classes] + updated_yaml_string = "[" + ",".join(klass_list) + "]" + return updated_yaml_string + + def recommendStormConfigurationsFromHDP206(self, configurations, clusterData, services, hosts): + putStormSiteProperty = self.putProperty(configurations, "storm-site", services) + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + # Storm AMS integration + if 'AMBARI_METRICS' in servicesList: + putStormSiteProperty('metrics.reporter.register', 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter') + + def recommendStormConfigurationsFromHDP21(self, configurations, clusterData, services, hosts): + storm_mounts = [ + ("storm.local.dir", ["NODEMANAGER", "NIMBUS"], "/hadoop/storm", "single") + ] + + self.updateMountProperties("storm-site", storm_mounts, configurations, services, hosts) + + + def recommendStormConfigurationsFromHDP22(self, configurations, clusterData, services, hosts): + putStormSiteProperty = self.putProperty(configurations, "storm-site", services) + putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site") + storm_site = self.getServicesSiteProperties(services, "storm-site") + security_enabled = StormServiceAdvisor.isKerberosEnabled(services, configurations) + if "ranger-env" in services["configurations"] and "ranger-storm-plugin-properties" in services["configurations"] and \ + "ranger-storm-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + putStormRangerPluginProperty = self.putProperty(configurations, "ranger-storm-plugin-properties", services) + rangerEnvStormPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-storm-plugin-enabled"] + putStormRangerPluginProperty("ranger-storm-plugin-enabled", rangerEnvStormPluginProperty) + + rangerPluginEnabled = '' + if 'ranger-storm-plugin-properties' in configurations and 'ranger-storm-plugin-enabled' in configurations['ranger-storm-plugin-properties']['properties']: + rangerPluginEnabled = configurations['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] + elif 'ranger-storm-plugin-properties' in services['configurations'] and 'ranger-storm-plugin-enabled' in services['configurations']['ranger-storm-plugin-properties']['properties']: + rangerPluginEnabled = services['configurations']['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] + + nonRangerClass = 'org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer' + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + rangerServiceVersion='' + if 'RANGER' in servicesList: + rangerServiceVersion = [service['StackServices']['service_version'] for service in services["services"] if service['StackServices']['service_name'] == 'RANGER'][0] + + if rangerServiceVersion and rangerServiceVersion == '0.4.0': + rangerClass = 'com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer' + else: + rangerClass = 'org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer' + # Cluster is kerberized + if security_enabled: + if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()): + putStormSiteProperty('nimbus.authorizer',rangerClass) + else: + putStormSiteProperty('nimbus.authorizer', nonRangerClass) + else: + putStormSiteAttributes('nimbus.authorizer', 'delete', 'true') + + + def recommendStormConfigurationsFromHDP23(self, configurations, clusterData, services, hosts): + putStormStartupProperty = self.putProperty(configurations, "storm-site", services) + putStormEnvProperty = self.putProperty(configurations, "storm-env", services) + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + + if "storm-site" in services["configurations"]: + # atlas + notifier_plugin_property = "storm.topology.submission.notifier.plugin.class" + if notifier_plugin_property in services["configurations"]["storm-site"]["properties"] and \ + services["configurations"]["storm-site"]["properties"][notifier_plugin_property] is not None: + + notifier_plugin_value = services["configurations"]["storm-site"]["properties"][notifier_plugin_property] + else: + notifier_plugin_value = " " + + atlas_is_present = "ATLAS" in servicesList + atlas_hook_class = "org.apache.atlas.storm.hook.StormAtlasHook" + atlas_hook_is_set = atlas_hook_class in notifier_plugin_value + enable_atlas_hook = False + enable_external_atlas_for_storm = False + + if 'storm-atlas-application.properties' in services['configurations'] and 'enable.external.atlas.for.storm' in services['configurations']['storm-atlas-application.properties']['properties']: + enable_external_atlas_for_storm = services['configurations']['storm-atlas-application.properties']['properties']['enable.external.atlas.for.storm'].lower() == "true" + + if atlas_is_present: + putStormEnvProperty("storm.atlas.hook", "true") + elif enable_external_atlas_for_storm: + putStormEnvProperty("storm.atlas.hook", "true") + else: + putStormEnvProperty("storm.atlas.hook", "false") + + if 'storm-env' in configurations and 'storm.atlas.hook' in configurations['storm-env']['properties']: + enable_atlas_hook = configurations['storm-env']['properties']['storm.atlas.hook'] == "true" + elif 'storm-env' in services['configurations'] and 'storm.atlas.hook' in services['configurations']['storm-env']['properties']: + enable_atlas_hook = services['configurations']['storm-env']['properties']['storm.atlas.hook'] == "true" + + if enable_atlas_hook and not atlas_hook_is_set: + notifier_plugin_value = atlas_hook_class if notifier_plugin_value == " " else ",".join([notifier_plugin_value, atlas_hook_class]) + + if not enable_atlas_hook and atlas_hook_is_set: + application_classes = [item for item in notifier_plugin_value.split(",") if item != atlas_hook_class and item != " "] + notifier_plugin_value = ",".join(application_classes) if application_classes else " " + + if notifier_plugin_value.strip() != "": + putStormStartupProperty(notifier_plugin_property, notifier_plugin_value) + else: + putStormStartupPropertyAttribute = self.putPropertyAttribute(configurations, "storm-site") + putStormStartupPropertyAttribute(notifier_plugin_property, 'delete', 'true') + + def recommendStormConfigurationsFromHDP26(self, configurations, clusterData, services, hosts): + """ + In HDF-2.6.1 we introduced a new way of doing Auto Credentials with services such as + HDFS, HIVE, HBASE. This method will update the required configs for autocreds if the users installs + STREAMLINE service. + """ + storm_site = self.getServicesSiteProperties(services, "storm-site") + storm_env = self.getServicesSiteProperties(services, "storm-env") + putStormSiteProperty = self.putProperty(configurations, "storm-site", services) + putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site") + security_enabled = StormServiceAdvisor.isKerberosEnabled(services, configurations) + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + + if storm_env and storm_site and security_enabled and 'STREAMLINE' in servicesList: + storm_nimbus_impersonation_acl = storm_site["nimbus.impersonation.acl"] if "nimbus.impersonation.acl" in storm_site else None + if storm_nimbus_impersonation_acl is not None: + storm_nimbus_impersonation_acl = "{ {{storm_bare_jaas_principal}} : {hosts: ['*'], groups: ['*']},{{streamline_bare_jaas_principal}} : {hosts: ['*'], groups: ['*']}}" + putStormSiteProperty('nimbus.impersonation.acl', storm_nimbus_impersonation_acl) + + storm_nimbus_autocred_plugin_classes = storm_site["nimbus.autocredential.plugins.classes"] if "nimbus.autocredential.plugins.classes" in storm_site else None + if storm_nimbus_autocred_plugin_classes is not None: + new_storm_nimbus_autocred_plugin_classes = ['org.apache.storm.hdfs.security.AutoHDFS', + 'org.apache.storm.hbase.security.AutoHBase', + 'org.apache.storm.hive.security.AutoHive'] + new_conf = self.appendToYamlString(storm_nimbus_autocred_plugin_classes, + new_storm_nimbus_autocred_plugin_classes) + + putStormSiteProperty("nimbus.autocredential.plugins.classes", new_conf) + else: + putStormSiteProperty("nimbus.autocredential.plugins.classes", "['org.apache.storm.hdfs.security.AutoHDFS', 'org.apache.storm.hbase.security.AutoHBase', 'org.apache.storm.hive.security.AutoHive']") + + + storm_nimbus_credential_renewer_classes = storm_site["nimbus.credential.renewers.classes"] if "nimbus.credential.renewers.classes" in storm_site else None + if storm_nimbus_credential_renewer_classes is not None: + new_storm_nimbus_credential_renewer_classes_array = ['org.apache.storm.hdfs.security.AutoHDFS', + 'org.apache.storm.hbase.security.AutoHBase', + 'org.apache.storm.hive.security.AutoHive'] + new_conf = self.appendToYamlString(storm_nimbus_credential_renewer_classes, + new_storm_nimbus_credential_renewer_classes_array) + putStormSiteProperty("nimbus.autocredential.plugins.classes", new_conf) + else: + putStormSiteProperty("nimbus.credential.renewers.classes", "['org.apache.storm.hdfs.security.AutoHDFS', 'org.apache.storm.hbase.security.AutoHBase', 'org.apache.storm.hive.security.AutoHive']") + putStormSiteProperty("nimbus.credential.renewers.freq.secs", "82800") + pass + + def recommendStormConfigurationsFromHDP30(self, configurations, clusterData, services, hosts): + + storm_site = self.getServicesSiteProperties(services, "storm-site") + security_enabled = StormServiceAdvisor.isKerberosEnabled(services, configurations) + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + putStormSiteProperty = self.putProperty(configurations, "storm-site", services) + + # Storm AMS integration + if 'AMBARI_METRICS' in servicesList: + putStormSiteProperty('storm.cluster.metrics.consumer.register', '[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter"}]') + putStormSiteProperty('topology.metrics.consumer.register', + '[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink", ' + '"parallelism.hint": 1, ' + '"whitelist": ["kafkaOffset\\\..+/", "__complete-latency", "__process-latency", ' + '"__execute-latency", ' + '"__receive\\\.population$", "__sendqueue\\\.population$", "__execute-count", "__emit-count", ' + '"__ack-count", "__fail-count", "memory/heap\\\.usedBytes$", "memory/nonHeap\\\.usedBytes$", ' + '"GC/.+\\\.count$", "GC/.+\\\.timeMs$"]}]') + else: + putStormSiteProperty('storm.cluster.metrics.consumer.register', 'null') + putStormSiteProperty('topology.metrics.consumer.register', 'null') + + if storm_site and not security_enabled: + putStormSitePropertyAttribute = self.putPropertyAttribute(configurations, "storm-site") + if 'nimbus.impersonation.acl' in storm_site: + putStormSitePropertyAttribute('nimbus.impersonation.acl', 'delete', 'true') + if 'nimbus.impersonation.authorizer' in storm_site: + putStormSitePropertyAttribute('nimbus.impersonation.authorizer', 'delete', 'true') + + +class StormValidator(service_advisor.ServiceAdvisor): + """ + Kafka Validator checks the correctness of properties whenever the service is first added or the user attempts to + change configs via the UI. + """ + + def __init__(self, *args, **kwargs): + self.as_super = super(StormValidator, self) + self.as_super.__init__(*args, **kwargs) + + self.validators = [("storm-site", self.validateStormConfigurationsFromHDP25), + ("ranger-storm-plugin-properties", self.validateStormConfigurationsFromHDP22)] + + def validateStormConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + # Storm AMS integration + if 'AMBARI_METRICS' in servicesList and "metrics.reporter.register" in properties and \ + "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter" not in properties.get("metrics.reporter.register"): + + validationItems.append({"config-name": 'metrics.reporter.register', + "item": self.getWarnItem( + "Should be set to org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter to report the metrics to Ambari Metrics service.")}) + + return self.toConfigurationValidationProblems(validationItems, "storm-site") + + + + def validateStormConfigurationsFromHDP22(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-storm-plugin-properties") + ranger_plugin_enabled = ranger_plugin_properties['ranger-storm-plugin-enabled'] if ranger_plugin_properties else 'No' + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + security_enabled = StormServiceAdvisor.isKerberosEnabled(services, configurations) + if 'RANGER' in servicesList and ranger_plugin_enabled.lower() == 'yes': + # ranger-hdfs-plugin must be enabled in ranger-env + ranger_env = self.getServicesSiteProperties(services, 'ranger-env') + if not ranger_env or not 'ranger-storm-plugin-enabled' in ranger_env or \ + ranger_env['ranger-storm-plugin-enabled'].lower() != 'yes': + validationItems.append({"config-name": 'ranger-storm-plugin-enabled', + "item": self.getWarnItem( + "ranger-storm-plugin-properties/ranger-storm-plugin-enabled must correspond ranger-env/ranger-storm-plugin-enabled")}) + if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()) and not security_enabled: + validationItems.append({"config-name": "ranger-storm-plugin-enabled", + "item": self.getWarnItem( + "Ranger Storm plugin should not be enabled in non-kerberos environment.")}) + + return self.toConfigurationValidationProblems(validationItems, "ranger-storm-plugin-properties") + + def validateStormConfigurationsFromHDP25(self, properties, recommendedDefaults, configurations, services, hosts): + self.validateStormConfigurationsFromHDP206(properties, recommendedDefaults, configurations, services, hosts) + validationItems = [] + + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + # Storm AMS integration + if 'AMBARI_METRICS' in servicesList: + if "storm.cluster.metrics.consumer.register" in properties and \ + 'null' in properties.get("storm.cluster.metrics.consumer.register"): + + validationItems.append({"config-name": 'storm.cluster.metrics.consumer.register', + "item": self.getWarnItem( + "Should be set to recommended value to report metrics to Ambari Metrics service.")}) + + if "topology.metrics.consumer.register" in properties and \ + 'null' in properties.get("topology.metrics.consumer.register"): + + validationItems.append({"config-name": 'topology.metrics.consumer.register', + "item": self.getWarnItem( + "Should be set to recommended value to report metrics to Ambari Metrics service.")}) + + return self.toConfigurationValidationProblems(validationItems, "storm-site") + + +