From 533177666d2f92684151a765aa8f1259d01c192c Mon Sep 17 00:00:00 2001 From: igorborgest Date: Sat, 26 Oct 2019 09:07:01 -0300 Subject: [PATCH] Bumping version to 0.0.13 --- README.md | 2 +- awswrangler/__version__.py | 2 +- awswrangler/emr.py | 196 ++++++++++++++++++--------- testing/run-tests.sh | 4 +- testing/test_awswrangler/test_emr.py | 167 ++++++++++++----------- testing/test_awswrangler/test_s3.py | 7 +- 6 files changed, 232 insertions(+), 146 deletions(-) diff --git a/README.md b/README.md index ca084e416..91f0eb144 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ > Utility belt to handle data on AWS. -[![Release](https://img.shields.io/badge/release-0.0.12-brightgreen.svg)](https://pypi.org/project/awswrangler/) +[![Release](https://img.shields.io/badge/release-0.0.13-brightgreen.svg)](https://pypi.org/project/awswrangler/) [![Downloads](https://img.shields.io/pypi/dm/awswrangler.svg)](https://pypi.org/project/awswrangler/) [![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7-brightgreen.svg)](https://pypi.org/project/awswrangler/) [![Documentation Status](https://readthedocs.org/projects/aws-data-wrangler/badge/?version=latest)](https://aws-data-wrangler.readthedocs.io/en/latest/?badge=latest) diff --git a/awswrangler/__version__.py b/awswrangler/__version__.py index 88a7559e7..a62048e1f 100644 --- a/awswrangler/__version__.py +++ b/awswrangler/__version__.py @@ -1,4 +1,4 @@ __title__ = "awswrangler" __description__ = "Utility belt to handle data on AWS." -__version__ = "0.0.12" +__version__ = "0.0.13" __license__ = "Apache License 2.0" diff --git a/awswrangler/emr.py b/awswrangler/emr.py index 5340f85ea..910b2ef0c 100644 --- a/awswrangler/emr.py +++ b/awswrangler/emr.py @@ -2,7 +2,7 @@ Module to handle all utilities related to EMR (Elastic Map Reduce) https://aws.amazon.com/emr/ """ -from typing import Optional, List, Dict +from typing import Optional, List, Dict, Any, Union, Collection import logging import json @@ -29,8 +29,8 @@ def _build_cluster_args(**pars): "JobFlowRole": pars["emr_ec2_role"], "ServiceRole": pars["emr_role"], "Instances": { - "KeepJobFlowAliveWhenNoSteps": True, - "TerminationProtected": False, + "KeepJobFlowAliveWhenNoSteps": pars["keep_cluster_alive_when_no_steps"], + "TerminationProtected": pars["termination_protected"], "Ec2SubnetId": pars["subnet_id"], "InstanceFleets": [] } @@ -53,47 +53,68 @@ def _build_cluster_args(**pars): args["Instances"]["ServiceAccessSecurityGroup"] = pars["security_group_service_access"] # Configurations - if pars["python3"] or pars["spark_glue_catalog"] or pars["hive_glue_catalog"] or pars["presto_glue_catalog"]: - args["Configurations"]: List = [] - if pars["python3"]: - args["Configurations"].append({ - "Classification": - "spark-env", - "Properties": {}, - "Configurations": [{ - "Classification": "export", - "Properties": { - "PYSPARK_PYTHON": "/usr/bin/python3" - }, - "Configurations": [] - }] - }) - if pars["spark_glue_catalog"]: - args["Configurations"].append({ - "Classification": "spark-hive-site", - "Properties": { - "hive.metastore.client.factory.class": - "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", - }, - "Configurations": [] - }) - if pars["hive_glue_catalog"]: - args["Configurations"].append({ - "Classification": "hive-site", - "Properties": { - "hive.metastore.client.factory.class": - "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" - }, - "Configurations": [] - }) - if pars["presto_glue_catalog"]: - args["Configurations"].append({ - "Classification": "presto-connector-hive", + args["Configurations"]: List[Dict[str, Any]] = [{ + "Classification": "spark-log4j", + "Properties": { + "log4j.rootCategory": f"{pars['spark_log_level']}, console" + } + }] + if pars["python3"]: + args["Configurations"].append({ + "Classification": + "spark-env", + "Properties": {}, + "Configurations": [{ + "Classification": "export", "Properties": { - "hive.metastore.glue.datacatalog.enabled": "true" + "PYSPARK_PYTHON": "/usr/bin/python3" }, "Configurations": [] - }) + }] + }) + if pars["spark_glue_catalog"]: + args["Configurations"].append({ + "Classification": "spark-hive-site", + "Properties": { + "hive.metastore.client.factory.class": + "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", + }, + "Configurations": [] + }) + if pars["hive_glue_catalog"]: + args["Configurations"].append({ + "Classification": "hive-site", + "Properties": { + "hive.metastore.client.factory.class": + "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" + }, + "Configurations": [] + }) + if pars["presto_glue_catalog"]: + args["Configurations"].append({ + "Classification": "presto-connector-hive", + "Properties": { + "hive.metastore.glue.datacatalog.enabled": "true" + }, + "Configurations": [] + }) + if pars["maximize_resource_allocation"]: + args["Configurations"].append({ + "Classification": "spark", + "Properties": { + "maximizeResourceAllocation": "true" + } + }) + if (pars["spark_jars_path"] is not None) or (pars["spark_defaults"] is not None): + spark_defaults: Dict[str, Union[str, Dict[str, str]]] = { + "Classification": "spark-defaults", + "Properties": {} + } + if pars["spark_jars_path"] is not None: + spark_defaults["Properties"]["spark.jars"] = pars["spark_jars_path"] + for k, v in pars["spark_defaults"].items(): + spark_defaults["Properties"][k] = v + args["Configurations"].append(spark_defaults) # Applications if pars["applications"]: @@ -108,16 +129,20 @@ def _build_cluster_args(**pars): } } for x in pars["bootstraps_paths"]] - # Debugging - if pars["debugging"]: - args["Steps"]: List[Dict] = [{ - "Name": "Setup Hadoop Debugging", - "ActionOnFailure": "TERMINATE_CLUSTER", - "HadoopJarStep": { - "Jar": "command-runner.jar", - "Args": ["state-pusher-script"] - } - }] + # Debugging and Steps + if (pars["debugging"] is True) or (pars["steps"] is not None): + args["Steps"]: List[Dict[str, Collection[str]]] = [] + if pars["debugging"] is True: + args["Steps"].append({ + "Name": "Setup Hadoop Debugging", + "ActionOnFailure": "TERMINATE_CLUSTER", + "HadoopJarStep": { + "Jar": "command-runner.jar", + "Args": ["state-pusher-script"] + } + }) + if pars["steps"] is not None: + args["Steps"] += pars["steps"] # Master Instance Fleet timeout_action_master: str = "SWITCH_TO_ON_DEMAND" if pars[ @@ -161,7 +186,8 @@ def _build_cluster_args(**pars): # Core Instance Fleet if (pars["instance_num_spot_core"] > 0) or pars["instance_num_on_demand_core"] > 0: - timeout_action_core = "SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_core"] else "TERMINATE_CLUSTER" + timeout_action_core = "SWITCH_TO_ON_DEMAND" if pars[ + "spot_timeout_to_on_demand_core"] else "TERMINATE_CLUSTER" fleet_core: Dict = { "Name": "CORE", @@ -284,7 +310,14 @@ def create_cluster(self, security_groups_master_additional: Optional[List[str]] = None, security_group_slave: Optional[str] = None, security_groups_slave_additional: Optional[List[str]] = None, - security_group_service_access: Optional[str] = None): + security_group_service_access: Optional[str] = None, + spark_log_level: str = "WARN", + spark_jars_path: Optional[str] = None, + spark_defaults: Dict[str, str] = None, + maximize_resource_allocation: bool = False, + steps: Optional[List[Dict[str, Collection[str]]]] = None, + keep_cluster_alive_when_no_steps: bool = True, + termination_protected: bool = False): """ Create a EMR cluster with instance fleets configuration https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-fleet.html @@ -329,6 +362,13 @@ def create_cluster(self, :param security_group_slave: The identifier of the Amazon EC2 security group for the core and task nodes. :param security_groups_slave_additional: A list of additional Amazon EC2 security group IDs for the core and task nodes. :param security_group_service_access: The identifier of the Amazon EC2 security group for the Amazon EMR service to access clusters in VPC private subnets. + :param spark_log_level: log4j.rootCategory log level (ALL, DEBUG, INFO, WARN, ERROR, FATAL, OFF, TRACE) + :param spark_jars_path: spark.jars (https://spark.apache.org/docs/latest/configuration.html) (e.g. s3://...) + :param spark_defaults: (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#spark-defaults) + :param maximize_resource_allocation: Configure your executors to utilize the maximum resources possible (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation) + :param steps: Steps definitions (Obs: Use EMR.build_step() to build that) + :param keep_cluster_alive_when_no_steps: Specifies whether the cluster should remain available after completing all steps + :param termination_protected: Specifies whether the Amazon EC2 instances in the cluster are protected from termination by API calls, user intervention, or in the event of a job-flow error. :return: Cluster ID (string) """ args = EMR._build_cluster_args(**locals()) @@ -358,28 +398,60 @@ def terminate_cluster(self, cluster_id: str) -> None: ]) logger.info(f"response: \n{json.dumps(response, default=str, indent=4)}") - def submit_step(self, cluster_id: str, name: str, cmd: str, action_on_failure: str = "CONTINUE") -> str: + def submit_steps(self, cluster_id: str, steps: List[Dict[str, Collection[str]]]) -> List[str]: + """ + Submit a list of steps + :param cluster_id: EMR Cluster ID + :param steps: Steps definitions (Obs: Use EMR.build_step() to build that) + :return: List of step IDs + """ + response: Dict = self._client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=steps) + logger.info(f"response: \n{json.dumps(response, default=str, indent=4)}") + return response["StepIds"] + + def submit_step(self, + cluster_id: str, + name: str, + command: str, + action_on_failure: str = "CONTINUE", + script: bool = False) -> str: """ Submit new job in the EMR Cluster :param cluster_id: EMR Cluster ID :param name: Step name - :param cmd: Command to be executed + :param command: e.g. 'echo "Hello!"' | e.g. for script 's3://.../script.sh arg1 arg2' :param action_on_failure: 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' + :param script: True for raw command or False for script runner (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html) :return: Step ID """ - region: str = self._session.region_name - logger.info(f"region: {region}") + step = EMR.build_step(self, name=name, command=command, action_on_failure=action_on_failure, script=script) + response: Dict = self._client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step]) + logger.info(f"response: \n{json.dumps(response, default=str, indent=4)}") + return response["StepIds"][0] + + def build_step(self, name: str, command: str, action_on_failure: str = "CONTINUE", + script: bool = False) -> Dict[str, Collection[str]]: + """ + Build the Step dictionary + :param name: Step name + :param command: e.g. 'echo "Hello!"' | e.g. for script 's3://.../script.sh arg1 arg2' + :param action_on_failure: 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' + :param script: True for raw command or False for script runner (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html) + :return: Step Dict + """ + jar: str = "command-runner.jar" + if script is True: + region: str = self._session.region_name + jar = f"s3://{region}.elasticmapreduce/libs/script-runner/script-runner.jar" step = { "Name": name, "ActionOnFailure": action_on_failure, "HadoopJarStep": { - "Jar": f"s3://{region}.elasticmapreduce/libs/script-runner/script-runner.jar", - "Args": cmd.split(" ") + "Jar": jar, + "Args": command.split(" ") } } - response: Dict = self._client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step]) - logger.info(f"response: \n{json.dumps(response, default=str, indent=4)}") - return response["StepIds"][0] + return step def get_step_state(self, cluster_id: str, step_id: str) -> str: """ diff --git a/testing/run-tests.sh b/testing/run-tests.sh index efbb97000..d2d67104f 100755 --- a/testing/run-tests.sh +++ b/testing/run-tests.sh @@ -1,8 +1,8 @@ -#!/bin/#!/usr/bin/env bash +#!/usr/bin/env bash set -e cd .. -pip install -e . +pip install --upgrade -e . yapf --in-place --recursive setup.py awswrangler testing/test_awswrangler mypy awswrangler flake8 setup.py awswrangler testing/test_awswrangler diff --git a/testing/test_awswrangler/test_emr.py b/testing/test_awswrangler/test_emr.py index 266ea2b26..eb0bbf35b 100644 --- a/testing/test_awswrangler/test_emr.py +++ b/testing/test_awswrangler/test_emr.py @@ -36,49 +36,54 @@ def bucket(session, cloudformation_outputs): def test_cluster(session, bucket, cloudformation_outputs): - cluster_id = session.emr.create_cluster( - cluster_name="wrangler_cluster", - logging_s3_path=f"s3://{bucket}/emr-logs/", - emr_release="emr-5.27.0", - subnet_id=cloudformation_outputs["SubnetId"], - emr_ec2_role="EMR_EC2_DefaultRole", - emr_role="EMR_DefaultRole", - instance_type_master="m5.xlarge", - instance_type_core="m5.xlarge", - instance_type_task="m5.xlarge", - instance_ebs_size_master=50, - instance_ebs_size_core=50, - instance_ebs_size_task=50, - instance_num_on_demand_master=1, - instance_num_on_demand_core=1, - instance_num_on_demand_task=1, - instance_num_spot_master=0, - instance_num_spot_core=1, - instance_num_spot_task=1, - spot_bid_percentage_of_on_demand_master=100, - spot_bid_percentage_of_on_demand_core=100, - spot_bid_percentage_of_on_demand_task=100, - spot_provisioning_timeout_master=5, - spot_provisioning_timeout_core=5, - spot_provisioning_timeout_task=5, - spot_timeout_to_on_demand_master=True, - spot_timeout_to_on_demand_core=True, - spot_timeout_to_on_demand_task=True, - python3=True, - spark_glue_catalog=True, - hive_glue_catalog=True, - presto_glue_catalog=True, - bootstraps_paths=None, - debugging=True, - applications=["Hadoop", "Spark", "Ganglia", "Hive"], - visible_to_all_users=True, - key_pair_name=None, - ) + steps = [] + for cmd in ['echo "Hello"', "ls -la"]: + steps.append(session.emr.build_step(name=cmd, command=cmd)) + cluster_id = session.emr.create_cluster(cluster_name="wrangler_cluster", + logging_s3_path=f"s3://{bucket}/emr-logs/", + emr_release="emr-5.27.0", + subnet_id=cloudformation_outputs["SubnetId"], + emr_ec2_role="EMR_EC2_DefaultRole", + emr_role="EMR_DefaultRole", + instance_type_master="m5.xlarge", + instance_type_core="m5.xlarge", + instance_type_task="m5.xlarge", + instance_ebs_size_master=50, + instance_ebs_size_core=50, + instance_ebs_size_task=50, + instance_num_on_demand_master=1, + instance_num_on_demand_core=1, + instance_num_on_demand_task=1, + instance_num_spot_master=0, + instance_num_spot_core=1, + instance_num_spot_task=1, + spot_bid_percentage_of_on_demand_master=100, + spot_bid_percentage_of_on_demand_core=100, + spot_bid_percentage_of_on_demand_task=100, + spot_provisioning_timeout_master=5, + spot_provisioning_timeout_core=5, + spot_provisioning_timeout_task=5, + spot_timeout_to_on_demand_master=True, + spot_timeout_to_on_demand_core=True, + spot_timeout_to_on_demand_task=True, + python3=True, + spark_glue_catalog=True, + hive_glue_catalog=True, + presto_glue_catalog=True, + bootstraps_paths=None, + debugging=True, + applications=["Hadoop", "Spark", "Ganglia", "Hive"], + visible_to_all_users=True, + key_pair_name=None, + steps=steps) sleep(10) cluster_state = session.emr.get_cluster_state(cluster_id=cluster_id) print(f"cluster_state: {cluster_state}") assert cluster_state == "STARTING" - step_id = session.emr.submit_step(cluster_id=cluster_id, name="step_test", cmd='echo "Hello World!"') + step_id = session.emr.submit_step(cluster_id=cluster_id, + name="step_test", + command="s3://...script.sh arg1 arg2", + script=True) sleep(10) step_state = session.emr.get_step_state(cluster_id=cluster_id, step_id=step_id) print(f"step_state: {step_state}") @@ -87,46 +92,54 @@ def test_cluster(session, bucket, cloudformation_outputs): def test_cluster_single_node(session, bucket, cloudformation_outputs): - cluster_id = session.emr.create_cluster( - cluster_name="wrangler_cluster", - logging_s3_path=f"s3://{bucket}/emr-logs/", - emr_release="emr-5.27.0", - subnet_id=cloudformation_outputs["SubnetId"], - emr_ec2_role="EMR_EC2_DefaultRole", - emr_role="EMR_DefaultRole", - instance_type_master="m5.xlarge", - instance_type_core="m5.xlarge", - instance_type_task="m5.xlarge", - instance_ebs_size_master=50, - instance_ebs_size_core=50, - instance_ebs_size_task=50, - instance_num_on_demand_master=1, - instance_num_on_demand_core=0, - instance_num_on_demand_task=0, - instance_num_spot_master=0, - instance_num_spot_core=0, - instance_num_spot_task=0, - spot_bid_percentage_of_on_demand_master=100, - spot_bid_percentage_of_on_demand_core=100, - spot_bid_percentage_of_on_demand_task=100, - spot_provisioning_timeout_master=5, - spot_provisioning_timeout_core=5, - spot_provisioning_timeout_task=5, - spot_timeout_to_on_demand_master=False, - spot_timeout_to_on_demand_core=False, - spot_timeout_to_on_demand_task=False, - python3=False, - spark_glue_catalog=False, - hive_glue_catalog=False, - presto_glue_catalog=False, - bootstraps_paths=None, - debugging=False, - applications=["Hadoop", "Spark", "Ganglia", "Hive"], - visible_to_all_users=True, - key_pair_name=None, - ) + cluster_id = session.emr.create_cluster(cluster_name="wrangler_cluster", + logging_s3_path=f"s3://{bucket}/emr-logs/", + emr_release="emr-5.27.0", + subnet_id=cloudformation_outputs["SubnetId"], + emr_ec2_role="EMR_EC2_DefaultRole", + emr_role="EMR_DefaultRole", + instance_type_master="m5.xlarge", + instance_type_core="m5.xlarge", + instance_type_task="m5.xlarge", + instance_ebs_size_master=50, + instance_ebs_size_core=50, + instance_ebs_size_task=50, + instance_num_on_demand_master=1, + instance_num_on_demand_core=0, + instance_num_on_demand_task=0, + instance_num_spot_master=0, + instance_num_spot_core=0, + instance_num_spot_task=0, + spot_bid_percentage_of_on_demand_master=100, + spot_bid_percentage_of_on_demand_core=100, + spot_bid_percentage_of_on_demand_task=100, + spot_provisioning_timeout_master=5, + spot_provisioning_timeout_core=5, + spot_provisioning_timeout_task=5, + spot_timeout_to_on_demand_master=False, + spot_timeout_to_on_demand_core=False, + spot_timeout_to_on_demand_task=False, + python3=False, + spark_glue_catalog=False, + hive_glue_catalog=False, + presto_glue_catalog=False, + bootstraps_paths=None, + debugging=False, + applications=["Hadoop", "Spark", "Ganglia", "Hive"], + visible_to_all_users=True, + key_pair_name=None, + spark_log_level="ERROR", + spark_jars_path=f"s3://{bucket}/jars/", + spark_defaults={"spark.default.parallelism": "400"}, + maximize_resource_allocation=True, + keep_cluster_alive_when_no_steps=False, + termination_protected=False) sleep(10) cluster_state = session.emr.get_cluster_state(cluster_id=cluster_id) print(f"cluster_state: {cluster_state}") assert cluster_state == "STARTING" + steps = [] + for cmd in ['echo "Hello"', "ls -la"]: + steps.append(session.emr.build_step(name=cmd, command=cmd)) + session.emr.submit_steps(cluster_id=cluster_id, steps=steps) session.emr.terminate_cluster(cluster_id=cluster_id) diff --git a/testing/test_awswrangler/test_s3.py b/testing/test_awswrangler/test_s3.py index 6942d2ffc..aa4435071 100644 --- a/testing/test_awswrangler/test_s3.py +++ b/testing/test_awswrangler/test_s3.py @@ -100,10 +100,11 @@ def test_delete_objects(session, bucket, objects_num): def test_delete_listed_objects(session, bucket, objects_num): path = f"s3://{bucket}/objs-listed-{objects_num}/" print("Starting deletes...") + sleep(10) # Waiting for eventual consistency session.s3.delete_objects(path=path) print("Starting writes...") write_fake_objects(bucket, f"objs-listed-{objects_num}/", objects_num) - sleep(3) # Waiting for eventual consistency + sleep(10) # Waiting for eventual consistency print("Starting list...") objects_paths = session.s3.list_objects(path=path) assert len(objects_paths) == objects_num @@ -129,7 +130,7 @@ def test_delete_not_listed_objects(session, bucket, objects_num): session.s3.delete_objects(path=path) print("Starting writes...") write_fake_objects(bucket, f"objs-not-listed-{objects_num}/", objects_num) - sleep(3) # Waiting for eventual consistency + sleep(10) # Waiting for eventual consistency print("Starting not listed deletes...") session.s3.delete_not_listed_objects(objects_paths=[f"{path}0"]) print("Starting checks...") @@ -195,7 +196,7 @@ def test_copy_listed_objects(session, bucket, database, mode, procs_io_bound): procs_io_bound=procs_io_bound, ) print("Asserting...") - sleep(1) + sleep(10) dataframe2 = session.pandas.read_sql_athena(sql="select * from test_move_objects_0", database=database) if mode == "append": assert 2 * len(dataframe.index) == len(dataframe2.index)