Skip to content

Forward structured application JSON logs, optionally enriched with ECS fields #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
pushred opened this issue Feb 23, 2023 · 3 comments
Closed

Comments

@pushred
Copy link

pushred commented Feb 23, 2023

Describe the enhancement:

I am using the Kinesis input to collect structured logs from AWS CloudWatch that have been partly populated using the ecs-pino-format library. Inspecting the data output to Elasticsearch I found that there are a couple layers of structure around my application logs:

  • Elastic Serverless Forwarder's ECS fields
  • AWS log record structure, JSON stringified on the ECS message field
  • My application logs are further stringified on the log record's message field
{
  "@timestamp": "2023-02-14T21:26:04.387857Z",
  "message": "{\"id\":\"37385191276969030971785538657917125344955573022299717635\",\"timestamp\":1676409955997,\"message\":\"{\\\"log.level\\\":\\\"info\\\",\\\"@timestamp\\\":\\\"2023-02-14T21:25:55.996Z\\\",\\\"message\\\":\\\"beepboop\\\"}\"}",
}

I was expecting to see my application logs as-is. This could be achieved if:

  • The CloudWatch log record structure was parsed by ESF
  • The log record message string was attempted to be parsed as JSON
    • If message is valid JSON, merge the content into the root object that is forwarded
    • If message is not JSON, preserve it as-is
  • Optionally add (or overwrite/merge) ECS objects with data from ESF

Describe a specific use case for the enhancement or feature:

When I initially saw this I thought that it was a reasonable preservation of the various layers my logs are passing through. I assumed that I could use Ingest Pipelines to parse each layer, extract my logs, and merge them into the ESF structure in order to leverage some of it's ECS content while adding my own. For reference, here's a portion of my pipeline:

{
  "processors": [
    {
      "json": {
        "field": "message",
        "target_field": "parsed_cloudwatch_log_event"
      }
    },
    {
      "json": {
        "field": "parsed_cloudwatch_log_event.message",
        "add_to_root": true
      }
    },
    {
      "set": {
        "field": "@timestamp",
        "copy_from": "parsed_app_log.@timestamp",
        "ignore_failure": true
      }
    },
    {
      "set": {
        "field": "message",
        "copy_from": "parsed_app_log.message",
        "ignore_failure": true
      }
    },
    {
      "remove": {
        "field": "parsed_cloudwatch_log_event"
      }
    },
    {
      "remove": {
        "field": "parsed_app_log"
      }
    }
  ]
}

However I have encountered some limitations in the pipeline processors that are currently blocking me from accessing a log.level field that ecs-pino-format is adding.

In my own case I may need to stop using ecs-pino-format as I don't see the blocking issues in the data processors being resolved anytime soon. I don't really expect that this enhancement will be implemented either. But I wanted to at least document what I'm dealing with as a user.

@aspacca
Copy link
Contributor

aspacca commented Mar 7, 2023

hello @pushred, sorry for the late reply

just to be sure I got right your scenario: you send the logs collected with the ecs-pino-format to cloudwatch logs and then have a subscription of the log group to a kinesis data stream that you use as input of the forwarder, do you?

would you be able to use the cloudwatch logs log group (or log stream) as direct input of the forwarder? in this case the original json logs won't be wrapped by the subscription format to the kinesis data stream and you won't need any unfolding

I have the impression you have already added the expand_event_list_from_field: logEvents in your config, can you confirm?

I was expecting to see my application logs as-is. This could be achieved if:

  • The CloudWatch log record structure was parsed by ESF

  • The log record message string was attempted to be parsed as JSON

    • If message is valid JSON, merge the content into the root object that is forwarded
    • If message is not JSON, preserve it as-is
  • Optionally add (or overwrite/merge) ECS objects with data from ESF

I understand your expectation but that's not the current behaviour of the ESF

in your scenario each kinesis record has the following payload

{
    "owner": "111111111111",
    "logGroup": "CloudTrail/logs",
    "logStream": "111111111111_CloudTrail/logs_us-east-1",
    "subscriptionFilters": [
        "RecipientStream"
    ],
    "messageType": "DATA_MESSAGE",
    "logEvents": [
        {
            "id": "3195310660696698337880902507980421114328961542429EXAMPLE",
            "timestamp": 1432826855000,
            "message": "{\"log.level\":\"info\",\"@timestamp\":\"2023-02-14T21:25:55.996Z\",\"message\":\"beepboop\"}"
        },
        ...
    ]
}

this payload is already parsed as JSON, and on top of that you expand the entries in {"logEvents": [{forwarded event #1},{forwarded event #2}, etc]}

the problem is that {forwarded event #N} is an other nested level of json, that you would like to extract as event to be forwarder the content of {”message": ... }

if you had directly the cloudwatch log group/stream as input of the forwarder the scenario would be different:
in this case each log event message of the log group would have the following payload:

{"log.level":"info","@timestamp":"2023-02-14T21:25:55.996Z","message":"beepboop"}

and the forwarder would sent to ES the following event:

{
  "@timestamp": "2023-02-14T21:26:04.387857Z",
  "message": "{\"log.level\":\"info\",\"@timestamp\":\"2023-02-14T21:25:55.996Z\",\"message\":\"beepboop\"}",
}

I would expect you will be then able to run the following processor (or something similar):

{
  "processors": [
    {
      "json": {
        "field": "message",
        "add_to_root": true,
        "add_to_root_conflict_strategy": "replace"
      }
    }
  ]
}

it's unclear to me where does the parsed_app_log field in your ingestion pipeline come from

even assuming you stick to the format of the subscription filter to the kinesis data stream you should be able to run something like the following ingest pipeline:

{
  "processors": [
    {
      "json": {
        "field": "message",
        "target_field": "parsed_cloudwatch_log_event"
      }
    },
    {
      "json": {
        "field": "parsed_cloudwatch_log_event.message",
        "add_to_root": true,
        "add_to_root_conflict_strategy": "replace"
      }
    },
    {
      "remove": {
        "field": "parsed_cloudwatch_log_event"
      }
    }
  ]
}

do I miss anything?

there's no plan at the moment to add a feature to merge the content of a json field in the forwarder event to the root of the event directly in the forwarder: it's intended for such kind of processing to be taken care of in an ingestion pipeline, once you are given the ability to isolate single event to be forwarded (like using expand_event_list_from_field in this case)

@pushred
Copy link
Author

pushred commented Mar 7, 2023

just to be sure I got right your scenario: you send the logs collected with the ecs-pino-format to cloudwatch logs and then have a subscription of the log group to a kinesis data stream that you use as input of the forwarder

Exactly

would you be able to use the cloudwatch logs log group (or log stream) as direct input of the forwarder? in this case the original json logs won't be wrapped by the subscription format to the kinesis data stream and you won't need any unfolding

We pursued this option originally but the issues noted in #219 lead us to Kinesis.

I have the impression you have already added the expand_event_list_from_field: logEvents in your config, can you confirm?

Correct we are using this and it does remove the Kinesis structure.

even assuming you stick to the format of the subscription filter to the kinesis data stream you should be able to run something like the following ingest pipeline

I tried a pipeline this actually and ran into this error:

{
  "root_cause": [
    {
      "type": "illegal_argument_exception",
      "reason": "cannot add non-map fields to root of document"
    }
  ],
  "type": "illegal_argument_exception",
  "reason": "cannot add non-map fields to root of document"
}

We weren't able to make sense of it however and have ended up with a very verbose pipeline using Set to copy values from parsed_app_log into the root individually. Perhaps this is a bug I should file an issue for somewhere, not sure where though.

there's no plan at the moment to add a feature to merge the content of a json field in the forwarder event to the root of the event directly in the forwarder: it's intended for such kind of processing to be taken care of in an ingestion pipeline

Understood. In the end the ingest pipeline did meet our needs and I can see the benefit of not abstracting this away and preserving flexibility. Also understood the decision noted elsewhere to delegate data transformation to ingest pipelines.

This was just not a trivial "out of the box" experience, especially for me personally who hasn't worked with the Elastic stack much before. The number of Lambda functions in our project is definitely a complicating factor and maybe not so common. I thought I should file an issue to provide the feedback and for reference by anyone else in the future who might face similar obstacles.

@pushred pushred closed this as completed Mar 7, 2023
@aspacca
Copy link
Contributor

aspacca commented Mar 8, 2023

We weren't able to make sense of it however and have ended up with a very verbose pipeline using Set to copy values from parsed_app_log into the root individually. Perhaps this is a bug I should file an issue for somewhere, not sure where though.

I'd suggest you to open a thread in the https://discuss.elastic.co/c/elastic-stack/elasticsearch/6 about that.
not sure if it's a bug or a missing feature, or if there is any workaround. in case the repo should be the elasticearch one

This was just not a trivial "out of the box" experience, especially for me personally who hasn't worked with the Elastic stack much before. The number of Lambda functions in our project is definitely a complicating factor and maybe not so common. I thought I should file an issue to provide the feedback and for reference by anyone else in the future who might face similar obstacles.

sure, you did right to file an issue: knowledge will stay here for future users facing your same problems.
and we plan to collect this kind of feedback in a knowledge base related to the forwarder

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants