Skip to content
This repository has been archived by the owner on Aug 27, 2024. It is now read-only.

Test test_partitions_notexisting fails unexpected with dataset validation error instead of invalid path error #167

Open
stijnmoreels opened this issue Jan 21, 2022 · 0 comments
Labels
automated-testing All issues related to the automated testing of the library. bug Something isn't working
Milestone

Comments

@stijnmoreels
Copy link
Member

stijnmoreels commented Jan 21, 2022

Describe the bug
The test_partitions_notexisting test in test_workenvironment_aml.py fails unexpected with the wrong kind of exception thrown.

To Reproduce
Run the test_partitions_notexisting locally.

Expected behavior
The test succeed.

Additional context
Current version on master after v1.1.2.1.

./tests/test_workenvironment_aml.py::test_partitions_notexisting Failed: [undefined]azureml.data.dataset_error_handling.DatasetValidationError: DatasetValidationError:
	Message: Failed to validate the data.
ScriptExecutionException was caused by StreamAccessException.
  StreamAccessException was caused by NotFoundException.
    Found no resources for the input provided: '[REDACTED]'
| session_id=l_f1e7a55f-15bc-450a-97e4-d27711fd5a70
	InnerException None
	ErrorResponse 
{
    "error": {
        "code": "UserError",
        "message": "Failed to validate the data.\nScriptExecutionException was caused by StreamAccessException.\n  StreamAccessException was caused by NotFoundException.\n    Found no resources for the input provided: '[REDACTED]'\n| session_id=l_f1e7a55f-15bc-450a-97e4-d27711fd5a70"
    }
}
dataflow = Dataflow
  steps: [
    Step {
      id: 187cb135-15a0-4b34-ad14-a510ea28564a
      type: Microsoft.DPrep.GetDatastore...  },
    Step {
      id: 4d1470a1-3bf7-46cf-8a54-1e4b020ab470
      type: Microsoft.DPrep.DropColumnsBlock,
    },
  ]
error_message = 'Failed to validate the data.'

    def _validate_has_data(dataflow, error_message):
        ensure_dataflow(dataflow)
        try:
>           dataflow.verify_has_data()

.venv/lib/python3.8/site-packages/azureml/data/dataset_error_handling.py:65: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (Dataflow
  steps: [
    Step {
      id: 187cb135-15a0-4b34-ad14-a510ea28564a
      type: Microsoft.DPrep.GetDatastor...},
    Step {
      id: 4d1470a1-3bf7-46cf-8a54-1e4b020ab470
      type: Microsoft.DPrep.DropColumnsBlock,
    },
  ],)
kwargs = {}
logger = <Logger azureml.dataprep.api._loggerfactory.Dataflow (DEBUG)>
activityLogger = <Logger azureml.dataprep.api._loggerfactory.Dataflow (DEBUG)>

    @wraps(func)
    def wrapper(*args, **kwargs):
        logger = get_logger()
        with _LoggerFactory.track_activity(logger, func.__name__, DEFAULT_ACTIVITY_TYPE, custom_dimensions) as activityLogger:
            try:
>               return func(*args, **kwargs)

.venv/lib/python3.8/site-packages/azureml/dataprep/api/_loggerfactory.py:213: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Dataflow
  steps: [
    Step {
      id: 187cb135-15a0-4b34-ad14-a510ea28564a
      type: Microsoft.DPrep.GetDatastore...  },
    Step {
      id: 4d1470a1-3bf7-46cf-8a54-1e4b020ab470
      type: Microsoft.DPrep.DropColumnsBlock,
    },
  ]

    @track(get_logger)
    def verify_has_data(self):
        """
        Verifies that this Dataflow would produce records if executed. An exception will be thrown otherwise.
        """
        with tracer.start_as_current_span('Dataflow.verify_has_data', trace.get_current_span()):
            if len(self._steps) == 0:
                raise EmptyStepsError()
    
>           if len(self.take(1)._to_pyrecords()) == 0:

.venv/lib/python3.8/site-packages/azureml/dataprep/api/dataflow.py:852: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Dataflow
  steps: [
    Step {
      id: 187cb135-15a0-4b34-ad14-a510ea28564a
      type: Microsoft.DPrep.GetDatastore...ock,
    },
    Step {
      id: eaa7c8c5-ee1e-4369-a6f1-150416afd3c3
      type: Microsoft.DPrep.TakeBlock,
    },
  ]

    def _to_pyrecords(self):
        from azureml.dataprep.native import preppy_to_pyrecords
    
        with tracer.start_as_current_span('Dataflow._to_pyrecords', trace.get_current_span()) as span:
            self._raise_if_missing_secrets()
            try:
>               intermediate_files = _write_preppy_with_fallback('Dataflow.to_pyrecords', self, span_context=to_dprep_span_context(span.get_context()))

.venv/lib/python3.8/site-packages/azureml/dataprep/api/dataflow.py:776: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

activity = 'Dataflow.to_pyrecords'
dataflow = Dataflow
  steps: [
    Step {
      id: 187cb135-15a0-4b34-ad14-a510ea28564a
      type: Microsoft.DPrep.GetDatastore...ock,
    },
    Step {
      id: eaa7c8c5-ee1e-4369-a6f1-150416afd3c3
      type: Microsoft.DPrep.TakeBlock,
    },
  ]
force_clex = False
span_context = DPrepSpanContext({
  "isRemote": false,
  "spanId": "f710b6eea916a18d",
  "traceFlags": 1,
  "traceId": "798feb29824ca20f8eaa8f440b6e9d06",
  "tracestate": {}
})

    def _write_preppy_with_fallback(activity, dataflow, force_clex=False, span_context=None):
        import tempfile
        from pathlib import Path
    
        random_id = uuid4()
        intermediate_path = Path(os.path.join(tempfile.gettempdir(), str(random_id)))
        dataflow_to_execute = dataflow.add_step('Microsoft.DPrep.WritePreppyBlock', {
            'outputPath': {
                'target': 0,
                'resourceDetails': [{'path': str(intermediate_path)}]
            },
            'profilingFields': ['Kinds', 'MissingAndEmpty'],
            'ifDestinationExists': IfDestinationExists.REPLACE
        })
    
>       _execute_with_fallback(activity, dataflow_to_execute, force_clex=force_clex, span_context=span_context)

.venv/lib/python3.8/site-packages/azureml/dataprep/api/_dataframereader.py:190: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

activity = 'Dataflow.to_pyrecords'
dataflow_to_execute = Dataflow
  steps: [
    Step {
      id: 187cb135-15a0-4b34-ad14-a510ea28564a
      type: Microsoft.DPrep.GetDatastore...  },
    Step {
      id: 272b0c80-adc4-4e5e-805b-f2bba1a6fdfa
      type: Microsoft.DPrep.WritePreppyBlock,
    },
  ]
force_clex = False
span_context = DPrepSpanContext({
  "isRemote": false,
  "spanId": "f710b6eea916a18d",
  "traceFlags": 1,
  "traceId": "798feb29824ca20f8eaa8f440b6e9d06",
  "tracestate": {}
})

    def _execute_with_fallback(activity, dataflow_to_execute, force_clex=False, span_context=None):
        def rslex_execute():
            executor = get_rslex_executor()
            script = resolve_script(dataflow_to_execute)
            _ = executor.execute_script(script,
                                        False,
                                        # Following 3 arguments don't matter for execution that doesn't collect
                                        fail_on_error=False,
                                        fail_on_mixed_types=False,
                                        fail_on_out_of_range_datetime=False,
                                        traceparent=span_context.span_id if span_context else '')
    
        def clex_execute():
            from .dataflow import Dataflow
            activity_data = Dataflow._dataflow_to_anonymous_activity_data(dataflow_to_execute)
            dataflow_to_execute._engine_api.execute_anonymous_activity(
                ExecuteAnonymousActivityMessageArguments(
                    anonymous_activity=activity_data,
                    span_context=span_context
                )
            )
    
        rslex_error = None
        clex_error = None
        try:
            if not force_clex:
                try:
                    rslex_execute()
                except _RsLexDisabledException:
                    _LoggerFactory.trace(logger, "RusteLex disabled. Falling back to CLex.", {'activity': activity})
                    force_clex = True
                except Exception as e:
                    rslex_error = e
                    logger.info('rslex write preppy failed, falling back to clex.')
    
            if force_clex or rslex_error is not None:
                try:
>                   clex_execute()

.venv/lib/python3.8/site-packages/azureml/dataprep/api/_dataframereader.py:238: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def clex_execute():
        from .dataflow import Dataflow
        activity_data = Dataflow._dataflow_to_anonymous_activity_data(dataflow_to_execute)
>       dataflow_to_execute._engine_api.execute_anonymous_activity(
            ExecuteAnonymousActivityMessageArguments(
                anonymous_activity=activity_data,
                span_context=span_context
            )
        )

.venv/lib/python3.8/site-packages/azureml/dataprep/api/_dataframereader.py:216: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

op_code = <azureml.dataprep.api.engineapi.api.EngineAPI object at 0x7f295bf01fd0>
message = ExecuteAnonymousActivityMessageArguments({
  "anonymousActivity": {
    "blocks": [
      {
        "arguments": {
   ..."f710b6eea916a18d",
    "traceFlags": 1,
    "traceId": "798feb29824ca20f8eaa8f440b6e9d06",
    "tracestate": {}
  }
})
cancellation_token = None

    @wraps(send_message_func)
    def wrapper(op_code: str, message: object = None, cancellation_token: CancellationToken = None) -> object:
        changed = aml_env_vars.get_changed()
        if len(changed) > 0:
            engine_api_func().update_environment_variable(changed)
>       return send_message_func(op_code, message, cancellation_token)

.venv/lib/python3.8/site-packages/azureml/dataprep/api/_aml_helper.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <azureml.dataprep.api.engineapi.api.EngineAPI object at 0x7f295bf01fd0>
message_args = ExecuteAnonymousActivityMessageArguments({
  "anonymousActivity": {
    "blocks": [
      {
        "arguments": {
   ..."f710b6eea916a18d",
    "traceFlags": 1,
    "traceId": "798feb29824ca20f8eaa8f440b6e9d06",
    "tracestate": {}
  }
})
cancellation_token = None

    @update_aml_env_vars(get_engine_api)
    def execute_anonymous_activity(self, message_args: typedefinitions.ExecuteAnonymousActivityMessageArguments, cancellation_token: CancellationToken = None) -> None:
>       response = self._message_channel.send_message('Engine.ExecuteActivity', message_args, cancellation_token)

.venv/lib/python3.8/site-packages/azureml/dataprep/api/engineapi/api.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <azureml.dataprep.api.engineapi.engine.MultiThreadMessageChannel object at 0x7f2960083b50>
op_code = 'Engine.ExecuteActivity'
message = {'event': <threading.Event object at 0x7f29e50890a0>, 'response': {'error': {'errorCode': 'ScriptExecution.StreamAcces...ed: 'https://aiincubators.blob.core.windows.net/arcus/test-partitioning/stock_BE*.csv'\n"}, 'id': 9, 'jsonrpc': '2.0'}}
cancellation_token = None

    def send_message(self, op_code: str, message: object, cancellation_token: CancellationToken = None) -> object:
        """Send a message to the engine, and wait for its response."""
        if self._relaunch_callback is not None and self._was_relaunched:
            with self._relaunch_lock:
                self._was_relaunched = False
                self._relaunch_callback()
    
        self._last_message_id += 1
        message_id = self._last_message_id
        is_done = False
    
        def cancel_message():
            if is_done:
                return
    
            self.send_message('CancelMessage', {'idToCancel': message_id})
    
        if cancellation_token is not None:
            cancellation_token.register(cancel_message)
    
        event = Event()
        with self._messages_lock:
            self._pending_messages[message_id] = {'event': event, 'response': None}
        self._write_line(json.dumps({
            'messageId': message_id,
            'opCode': op_code,
            'data': message
        }, cls=CustomEncoder))
    
        event.wait()
        with self._messages_lock:
            message = self._pending_messages.pop(message_id, None)
            if message.get('error') is not None:
                raise message['error']
            response = message['response']
            is_done = True
    
        def cancel_on_error():
            if cancellation_token is not None:
                cancellation_token.cancel()
    
        if response is None:
            if not self._closing:
                cancel_on_error()
                raise ExecutionError({'errorData': {'errorMessage': 'An unknown error has occurred.'}})
            return None
    
        if 'error' in response:
            cancel_on_error()
>           raise_engine_error(response['error'])

.venv/lib/python3.8/site-packages/azureml/dataprep/api/engineapi/engine.py:291: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

error_response = {'errorCode': 'ScriptExecution.StreamAccess.NotFound', 'errorData': {'0_MessageFormat': '{exceptionType} was caused by...esources for the input provided: 'https://aiincubators.blob.core.windows.net/arcus/test-partitioning/stock_BE*.csv'\n"}

    def raise_engine_error(error_response):
        error_code = error_response['errorCode']
        if 'ScriptExecution' in error_code:
>           raise ExecutionError(error_response)
E           azureml.dataprep.api.errorhandlers.ExecutionError: 
E           Error Code: ScriptExecution.StreamAccess.NotFound
E           Failed Step: 187cb135-15a0-4b34-ad14-a510ea28564a
E           Error Message: ScriptExecutionException was caused by StreamAccessException.
E             StreamAccessException was caused by NotFoundException.
E               Found no resources for the input provided: 'https://aiincubators.blob.core.windows.net/arcus/test-partitioning/stock_BE*.csv'
E           | session_id=l_f1e7a55f-15bc-450a-97e4-d27711fd5a70

.venv/lib/python3.8/site-packages/azureml/dataprep/api/errorhandlers.py:10: ExecutionError

During handling of the above exception, another exception occurred:

    def test_partitions_notexisting():
        work_env = fac.Create(connected=True, datastore_path='arcus_partition_test')
>       partition_df = work_env.load_tabular_partition('test-partitioning/stock_BE*', columns=['Close', 'High', 'Isin', 'ItemDate', 'Low', 'Market', 'Open', 'Ticker', 'Volume'])

tests/test_workenvironment_aml.py:94: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
arcus/azureml/environment/aml_environment.py:130: in load_tabular_partition
    _aml_dataset = Dataset.Tabular.from_delimited_files(header=_header,
.venv/lib/python3.8/site-packages/azureml/data/_loggerfactory.py:132: in wrapper
    return func(*args, **kwargs)
.venv/lib/python3.8/site-packages/azureml/data/dataset_factory.py:366: in from_delimited_files
    dataflow = _transform_and_validate(
.venv/lib/python3.8/site-packages/azureml/data/dataset_factory.py:1166: in _transform_and_validate
    _validate_has_data(dataflow, 'Failed to validate the data.')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

dataflow = Dataflow
  steps: [
    Step {
      id: 187cb135-15a0-4b34-ad14-a510ea28564a
      type: Microsoft.DPrep.GetDatastore...  },
    Step {
      id: 4d1470a1-3bf7-46cf-8a54-1e4b020ab470
      type: Microsoft.DPrep.DropColumnsBlock,
    },
  ]
error_message = 'Failed to validate the data.'

    def _validate_has_data(dataflow, error_message):
        ensure_dataflow(dataflow)
        try:
            dataflow.verify_has_data()
        except (dataprep().api.dataflow.DataflowValidationError,
                dataprep().api.errorhandlers.ExecutionError) as e:
>           raise DatasetValidationError(error_message + '\n' + e.compliant_message, exception=e)
E           azureml.data.dataset_error_handling.DatasetValidationError: DatasetValidationError:
E           	Message: Failed to validate the data.
E           ScriptExecutionException was caused by StreamAccessException.
E             StreamAccessException was caused by NotFoundException.
E               Found no resources for the input provided: '[REDACTED]'
E           | session_id=l_f1e7a55f-15bc-450a-97e4-d27711fd5a70
E           	InnerException None
E           	ErrorResponse 
E           {
E               "error": {
E                   "code": "UserError",
E                   "message": "Failed to validate the data.\nScriptExecutionException was caused by StreamAccessException.\n  StreamAccessException was caused by NotFoundException.\n    Found no resources for the input provided: '[REDACTED]'\n| session_id=l_f1e7a55f-15bc-450a-97e4-d27711fd5a70"
E               }
E           }

.venv/lib/python3.8/site-packages/azureml/data/dataset_error_handling.py:68: DatasetValidationError
@stijnmoreels stijnmoreels added bug Something isn't working automated-testing All issues related to the automated testing of the library. labels Jan 21, 2022
@stijnmoreels stijnmoreels added this to the v1.2 milestone Jan 21, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
automated-testing All issues related to the automated testing of the library. bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant