Skip to content

Job cluster support and pytest update #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ nutter run dataload/ --cluster_id 0123-12334-tonedabc --recursive --max_parallel

__Note:__ Running tests notebooks in parallel introduces the risk of data race conditions when two or more tests notebooks modify the same tables or files at the same time. Before increasing the level of parallelism make sure that your tests cases modify only tables or files that are used or referenced within the scope of the test notebook.

### Running using a job cluster
CLUSTER_ID parameter only allows running tests on a cluster that is already running. It is also possible to create use a job cluster created specifically for these tests. To do that, provide cluster config as _new-cluster-config_ param, e.g. `--new-cluster-config '{ \"custom_tags\":{\"foo\": \"bar\"}, \"data_security_mode\": \"SINGLE_USER\", \"node_type_id\": \"Standard_D4ds_v5\", \"num_workers\": 0, \"policy_id\": \"123123\", \"runtime_engine\": \"STANDARD\", \"spark_version\": \"13.0.x-scala2.12\" }'`. For allowed values see tasks > new_cluster section in [API docs](https://docs.databricks.com/api/workspace/jobs/create).

## Nutter CLI Syntax and Flags

### Run Command
Expand All @@ -369,6 +372,8 @@ POSITIONAL ARGUMENTS

``` bash
FLAGS
--cluster-id ID of existing cluster to run at. Same as passing CLUSTER_ID positional arg.
--new_cluster_config Config for running on job cluster. See for more documentation above.
--timeout Execution timeout in seconds. Integer value. Default is 120
--junit_report Create a JUnit XML report from the test results.
--tags_report Create a CSV report from the test results that includes the test cases tags.
Expand Down Expand Up @@ -505,14 +510,6 @@ If using Visual Studio Code, you can use the `example_launch.json` file provided

### Contribution Tips

- There's a known issue with VS Code and the lastest version of pytest.
- Please make sure that you install pytest 5.0.1
- If you installed pytest using VS Code, then you are likely using the incorrect version. Run the following command to fix it:

``` Python
pip install --force-reinstall pytest==5.0.1
```

Creating the wheel file and manually test wheel locally

1. Change directory to the root that contains setup.py
Expand Down
15 changes: 9 additions & 6 deletions cli/nuttercli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import os
import datetime
import json

import common.api as api
from common.apiclient import DEFAULT_POLL_WAIT_TIME, InvalidConfigurationException
Expand Down Expand Up @@ -50,31 +51,33 @@ def __init__(self, debug=False, log_to_file=False, version=False):
self._set_nutter(debug)
super().__init__()

def run(self, test_pattern, cluster_id,
def run(self, test_pattern, cluster_id=None, new_cluster_config=None,
timeout=120, junit_report=False,
tags_report=False, max_parallel_tests=1,
recursive=False, poll_wait_time=DEFAULT_POLL_WAIT_TIME, notebook_params=None):
try:
logging.debug(""" Running tests. test_pattern: {} cluster_id: {} notebook_params: {} timeout: {}
logging.debug(""" Running tests. test_pattern: {} cluster_id: {} new_cluster_config={} notebook_params: {} timeout: {}
junit_report: {} max_parallel_tests: {}
tags_report: {} recursive:{} """
.format(test_pattern, cluster_id, timeout,
junit_report, max_parallel_tests,
.format(test_pattern, cluster_id, new_cluster_config,
timeout, junit_report, max_parallel_tests,
tags_report, recursive, notebook_params))

logging.debug("Executing test(s): {}".format(test_pattern))

new_cluster_config_dict = json.loads(new_cluster_config) if new_cluster_config is not None else None

if self._is_a_test_pattern(test_pattern):
logging.debug('Executing pattern')
results = self._nutter.run_tests(
test_pattern, cluster_id, timeout,
test_pattern, cluster_id, new_cluster_config, timeout,
max_parallel_tests, recursive, poll_wait_time, notebook_params)
self._nutter.events_processor_wait()
self._handle_results(results, junit_report, tags_report)
return

logging.debug('Executing single test')
result = self._nutter.run_test(test_pattern, cluster_id,
result = self._nutter.run_test(test_pattern, cluster_id, new_cluster_config_dict,
timeout, poll_wait_time)

self._handle_results([result], junit_report, tags_report)
Expand Down
1 change: 1 addition & 0 deletions cli/resultsvalidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def _validate_test_results(self, exit_output):


class TestCaseFailureException(Exception):
__test__ = False
def __init__(self, message):
super().__init__(message)

Expand Down
22 changes: 12 additions & 10 deletions common/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ def list_tests(self, path, recursive):
pass

@abstractmethod
def run_tests(self, pattern, cluster_id, timeout, max_parallel_tests):
def run_tests(self, pattern, cluster_id, new_cluster_config, timeout, max_parallel_tests):
pass

@abstractmethod
def run_test(self, testpath, cluster_id, timeout):
def run_test(self, testpath, cluster_id, new_cluster_config, timeout):
pass


Expand All @@ -87,20 +87,20 @@ def list_tests(self, path, recursive=False):

return tests

def run_test(self, testpath, cluster_id,
def run_test(self, testpath, cluster_id, new_cluster_config,
timeout=120, pull_wait_time=DEFAULT_POLL_WAIT_TIME, notebook_params=None):
self._add_status_event(NutterStatusEvents.TestExecutionRequest, testpath)
test_notebook = TestNotebook.from_path(testpath)
if test_notebook is None:
raise InvalidTestException

result = self.dbclient.execute_notebook(
test_notebook.path, cluster_id,
test_notebook.path, cluster_id, new_cluster_config,
timeout=timeout, pull_wait_time=pull_wait_time, notebook_params=notebook_params)

return result

def run_tests(self, pattern, cluster_id,
def run_tests(self, pattern, cluster_id, new_cluster_config,
timeout=120, max_parallel_tests=1, recursive=False,
poll_wait_time=DEFAULT_POLL_WAIT_TIME, notebook_params=None):

Expand All @@ -119,7 +119,7 @@ def run_tests(self, pattern, cluster_id,
NutterStatusEvents.TestsListingFiltered, len(filtered_notebooks))

return self._schedule_and_run(
filtered_notebooks, cluster_id, max_parallel_tests, timeout, poll_wait_time, notebook_params)
filtered_notebooks, cluster_id, new_cluster_config, max_parallel_tests, timeout, poll_wait_time, notebook_params)

def events_processor_wait(self):
if self._events_processor is None:
Expand Down Expand Up @@ -167,7 +167,7 @@ def _get_root_and_pattern(self, pattern):

return root, valid_pattern

def _schedule_and_run(self, test_notebooks, cluster_id,
def _schedule_and_run(self, test_notebooks, cluster_id, new_cluster_config,
max_parallel_tests, timeout, pull_wait_time, notebook_params=None):
func_scheduler = scheduler.get_scheduler(max_parallel_tests)
for test_notebook in test_notebooks:
Expand All @@ -176,12 +176,12 @@ def _schedule_and_run(self, test_notebooks, cluster_id,
logging.debug(
'Scheduling execution of: {}'.format(test_notebook.path))
func_scheduler.add_function(self._execute_notebook,
test_notebook.path, cluster_id, timeout, pull_wait_time, notebook_params)
test_notebook.path, cluster_id, new_cluster_config, timeout, pull_wait_time, notebook_params)
return self._run_and_await(func_scheduler)

def _execute_notebook(self, test_notebook_path, cluster_id, timeout, pull_wait_time, notebook_params=None):
def _execute_notebook(self, test_notebook_path, cluster_id, new_cluster_config, timeout, pull_wait_time, notebook_params=None):
result = self.dbclient.execute_notebook(test_notebook_path,
cluster_id, timeout, pull_wait_time, notebook_params)
cluster_id, new_cluster_config, timeout, pull_wait_time, notebook_params)
self._add_status_event(NutterStatusEvents.TestExecuted,
ExecutionResultEventData.from_execution_results(result))
logging.debug('Executed: {}'.format(test_notebook_path))
Expand Down Expand Up @@ -211,6 +211,7 @@ def _inspect_result(self, func_result):


class TestNotebook(object):
__test__ = False
def __init__(self, name, path):
if not self._is_valid_test_name(name):
raise InvalidTestException
Expand Down Expand Up @@ -250,6 +251,7 @@ def _get_notebook_name_from_path(cls, path):


class TestNamePatternMatcher(object):
__test__ = False
def __init__(self, pattern):
try:
# * is an invalid regex in python
Expand Down
8 changes: 4 additions & 4 deletions common/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ def list_objects(self, path):

return workspace_path_obj

def execute_notebook(self, notebook_path, cluster_id, timeout=120,
def execute_notebook(self, notebook_path, cluster_id, new_cluster_config, timeout=120,
pull_wait_time=DEFAULT_POLL_WAIT_TIME,
notebook_params=None):
if not notebook_path:
raise ValueError("empty path")
if not cluster_id:
raise ValueError("empty cluster id")
if not cluster_id and not new_cluster_config:
raise ValueError("Either cluster id or config must be provided")
if timeout < self.min_timeout:
raise ValueError(
"Timeout must be greater than {}".format(self.min_timeout))
Expand All @@ -77,7 +77,7 @@ def execute_notebook(self, notebook_path, cluster_id, timeout=120,

runid = self._retrier.execute(self.inner_dbclient.jobs.submit_run,
run_name=name,
existing_cluster_id=cluster_id,
existing_cluster_id=cluster_id, new_cluster=new_cluster_config,
notebook_task=ntask,
)

Expand Down
1 change: 1 addition & 0 deletions common/resultsview.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def failing_tests(self):


class TestCaseResultView(ResultsView):
__test__ = False
def __init__(self, nutter_test_results):

if not isinstance(nutter_test_results, TestResult):
Expand Down
1 change: 1 addition & 0 deletions common/testexecresults.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


class TestExecResults():
__test__ = False
def __init__(self, test_results):
if not isinstance(test_results, TestResults):
raise TypeError("test_results must be of type TestResults")
Expand Down
2 changes: 2 additions & 0 deletions common/testresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def get_test_results():
return TestResults()

class TestResults(PickleSerializable):
__test__ = False
def __init__(self):
self.results = []
self.test_cases = 0
Expand Down Expand Up @@ -70,6 +71,7 @@ def __item_in_list_equalto(self, expected_item):
return False

class TestResult:
__test__ = False
def __init__(self, test_name, passed,
execution_time, tags, exception=None, stack_trace=""):

Expand Down
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pytest==5.0.1
pytest
mock
pytest-mock
pytest-cov
1 change: 1 addition & 0 deletions runtime/testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def get_testcase(test_name):


class TestCase():
__test__ = False
ERROR_MESSAGE_ASSERTION_MISSING = """ TestCase does not contain an assertion function.
Please pass a function to set_assertion """

Expand Down
24 changes: 12 additions & 12 deletions tests/databricks/test_apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,35 @@ def test__execute_notebook__emptypath__valueerrror(mocker):
db = __get_client(mocker)

with pytest.raises(ValueError):
db.execute_notebook('', 'cluster')
db.execute_notebook('', 'cluster', None)


def test__execute_notebook__nonepath__valueerror(mocker):
db = __get_client(mocker)

with pytest.raises(ValueError):
db.execute_notebook(None, 'cluster')
db.execute_notebook(None, 'cluster', None)


def test__execute_notebook__emptycluster__valueerror(mocker):
db = __get_client(mocker)

with pytest.raises(ValueError):
db.execute_notebook('/', '')
db.execute_notebook('/', '', None)


def test__execute_notebook__non_dict_params__valueerror(mocker):
db = __get_client(mocker)

with pytest.raises(ValueError):
db.execute_notebook('/', 'cluster', notebook_params='')
db.execute_notebook('/', 'cluster', None, notebook_params='')


def test__execute_notebook__nonecluster__valueerror(mocker):
db = __get_client(mocker)

with pytest.raises(ValueError):
db.execute_notebook('/', None)
db.execute_notebook('/', None, None)


def test__execute_notebook__success__executeresult_has_run_url(mocker):
Expand All @@ -99,7 +99,7 @@ def test__execute_notebook__success__executeresult_has_run_url(mocker):
run_id['run_id'] = 1
db = __get_client_for_execute_notebook(mocker, output_data, run_id)

result = db.execute_notebook('/mynotebook', 'clusterid')
result = db.execute_notebook('/mynotebook', 'clusterid', None)

assert result.notebook_run_page_url == run_page_url

Expand All @@ -111,7 +111,7 @@ def test__execute_notebook__failure__executeresult_has_run_url(mocker):
run_id['run_id'] = 1
db = __get_client_for_execute_notebook(mocker, output_data, run_id)

result = db.execute_notebook('/mynotebook', 'clusterid')
result = db.execute_notebook('/mynotebook', 'clusterid', None)

assert result.notebook_run_page_url == run_page_url

Expand All @@ -122,7 +122,7 @@ def test__execute_notebook__terminatestate__success(mocker):
run_id['run_id'] = 1
db = __get_client_for_execute_notebook(mocker, output_data, run_id)

result = db.execute_notebook('/mynotebook', 'clusterid')
result = db.execute_notebook('/mynotebook', 'clusterid', None)

assert result.task_result_state == 'TERMINATED'

Expand All @@ -133,7 +133,7 @@ def test__execute_notebook__skippedstate__resultstate_is_SKIPPED(mocker):
run_id['run_id'] = 1
db = __get_client_for_execute_notebook(mocker, output_data, run_id)

result = db.execute_notebook('/mynotebook', 'clusterid')
result = db.execute_notebook('/mynotebook', 'clusterid', None)

assert result.task_result_state == 'SKIPPED'

Expand All @@ -144,7 +144,7 @@ def test__execute_notebook__internal_error_state__resultstate_is_INTERNAL_ERROR(
run_id['run_id'] = 1
db = __get_client_for_execute_notebook(mocker, output_data, run_id)

result = db.execute_notebook('/mynotebook', 'clusterid')
result = db.execute_notebook('/mynotebook', 'clusterid', None)

assert result.task_result_state == 'INTERNAL_ERROR'

Expand All @@ -157,7 +157,7 @@ def test__execute_notebook__timeout_1_sec_lcs_isrunning__timeoutexception(mocker

with pytest.raises(client.TimeOutException):
db.min_timeout = 1
result = db.execute_notebook('/mynotebook', 'clusterid', timeout=1)
result = db.execute_notebook('/mynotebook', 'clusterid', None, timeout=1)


def test__execute_notebook__timeout_greater_than_min__valueerror(mocker):
Expand All @@ -168,7 +168,7 @@ def test__execute_notebook__timeout_greater_than_min__valueerror(mocker):

with pytest.raises(ValueError):
db.min_timeout = 10
result = db.execute_notebook('/mynotebook', 'clusterid', timeout=1)
result = db.execute_notebook('/mynotebook', 'clusterid', None, timeout=1)


default_run_page_url = 'https://westus2.azuredatabricks.net/?o=14702dasda6094293890#job/4/run/1'
Expand Down
Loading