Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple long running task finishes but status continues to return 'Running' #2972

Open
TXAggie2000 opened this issue Nov 22, 2024 · 5 comments

Comments

@TXAggie2000
Copy link

TXAggie2000 commented Nov 22, 2024

I have a fairly simple process we are using durable functions for. We read in a json file and flatten the inner arrays per a specification and write a new file back to blob storage. Just one activity. What I have noticed when calling this from ADF, is that the file gets written, which is the last line of the task before the return, and the function will continue to return the status of 'Running' for quite a while after. This function does NOT return a large payload, it returns this:

return { "status": "Success", "message": f"Successfully unnested array. File created at {container_name}{blob_name}" }

As an example, here are the timestamps when the function finally returned 'Completed':

"createdTime": "2024-11-21T20:15:19Z",
"lastUpdatedTime": "2024-11-21T20:56:51Z"

The file was written at 2024-11-21T20:26:00Z. My ADF pipeline sleeps for 30 seconds and checks the status again. Why would the function continue for 30 minutes after the actual task was complete?

Thanks,
Scott

@cgillum
Copy link
Member

cgillum commented Nov 22, 2024

A few questions:

  • In this context, does "ADF" stand for "Azure Durable Functions" or "Azure Data Factory"?
  • Can you share the orchestration and activity code that you're using, or at least relevant snippets?
  • Are you seeing this in Azure? If so, can you share the orchestration instance ID and the region you're running in?

@cgillum cgillum added Needs: Author Feedback Waiting for the author of the issue to respond to a question and removed Needs: Triage 🔍 labels Nov 22, 2024
@TXAggie2000
Copy link
Author

TXAggie2000 commented Nov 22, 2024

Thanks for responding Chris. Yes, in this case ADF is Azure Data Factory. Here is the relevant code you asked for. Pretty vanilla and taken straight from the sample:

@myApp.route(route="orchestrators/unnest_json_async")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
    try:
        mapping_payload = req.get_json()
    except Exception as e:
        return func.HttpResponse(e,status_code=400)
    instance_id = await client.start_new('unnest_orchestrator', None, mapping_payload)
    response = client.create_check_status_response(req, instance_id)
    return response

# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def unnest_orchestrator(context):
    print(context)
    mapping = context.get_input()
    result1 = yield context.call_activity("unnest", mapping)
    return result1

I am running this in South Central US and here is an instance id: d48accb6e91344f6a16606d642cccec4

@microsoft-github-policy-service microsoft-github-policy-service bot added Needs: Attention 👋 and removed Needs: Author Feedback Waiting for the author of the issue to respond to a question labels Nov 22, 2024
@TXAggie2000
Copy link
Author

Sorry Chris, here is the activity code as well:

# Activity
@myApp.activity_trigger(input_name="mapping")
def unnest(mapping: dict):
    logging.info('Python Unnest JSON function processing a request.')
    logging.getLogger("azure").setLevel(logging.WARNING)

    if 'storage_account' in mapping and \
        'container' in mapping and \
        'source_directory' in mapping and \
        'target_directory' in mapping and \
        'nesting_spec' in mapping:

        storage_account = mapping['storage_account']
        container_name = mapping['container']
        source_directory = mapping['source_directory']
        target_directory = mapping['target_directory']
        nesting_spec = mapping['nesting_spec']

        default_credential = DefaultAzureCredential(logging_enable=False)
        account_url = f"https://{storage_account}.blob.core.windows.net"
        
        blob_service_client = BlobServiceClient(account_url, credential=default_credential, logging_enable=False)
        container_client = blob_service_client.get_container_client(container_name)
        blob_list = container_client.list_blobs(name_starts_with=source_directory)
        
        
        data = []
        for blob in blob_list:
            blob_name = blob.name
            if blob_name.endswith(".json"):
                blob_client = container_client.get_blob_client(blob_name)
                blob_data =  blob_client.download_blob().readall()
                content = blob_data.decode('utf-8-sig')
                for line_number, line in enumerate(content.splitlines(), start=1):
                    line = line.strip()
                    if line: 
                        try: 
                            json_object = json.loads(line)
                            data.append(json_object)
                        except json.JSONDecodeError as e: 
                            logging.error(f'JSON Decoding error while processing file {blob_name}')
                            return {
                                "status": "Error",
                                "message": f"JSON Decoding error while processing file {blob_name}: {e.msg}"
                            }

        if len(data) >= 1:
            new_spec = {
                "collection_reference": "",
                "nesting_spec": nesting_spec
            }
            unnested_result = flatten_json_by_spec(data, new_spec)
        else:
            logging.error('Not able to unnest JSON based on the specs.  Returned empty result.')
            return {
                "status": "Error",
                "message": "Not able to unnest JSON based on the specs.  Returned empty result."
            }
        
        output = {
            "items": unnested_result
        }
        
        if target_directory.endswith('/'): target_directory = target_directory.rstrip('/')
        blob_name = f"{target_directory}/{uuid.uuid4()}.json"

        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
        blob_client.upload_blob(json.dumps(output), blob_type="BlockBlob", overwrite=True)

        return {
            "status": "Success",
            "message": f"Successfully unnested array. File created at {container_name}{blob_name}"
        }
    else:
        return {
            "status": "Error",
            "message": "Missing required values in order to parse and unnest json..."
        }

@cgillum
Copy link
Member

cgillum commented Nov 22, 2024

Hmm...yeah, everything looks good from a code perspective. I wonder if something else is afoot. Have you had a look through the Durable Functions troubleshooting guide? https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-troubleshooting-guide. There is some automated analysis that might be able to help root cause the problem. Take a look at the suggestions in the guide and respond back if you're still stuck.

@cgillum cgillum added Needs: Author Feedback Waiting for the author of the issue to respond to a question and removed Needs: Attention 👋 labels Nov 22, 2024
@TXAggie2000
Copy link
Author

Thanks Chris, following your link, specifically the section Orchestration doesn't complete / is stuck in the Running state, and nothing is indicating an issue. I did deploy this to another client's azure account and that pipeline is doing the same thing. I am going to write a python script to accomplish this task in order to isolate if this is an issue with Data Factory. If so, I will open a ticket with that team. I will keep you posted.

Thanks,
Scott

@microsoft-github-policy-service microsoft-github-policy-service bot added Needs: Attention 👋 and removed Needs: Author Feedback Waiting for the author of the issue to respond to a question labels Nov 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants