@@ -255,6 +255,25 @@ func TestNewParsePipeline_logfmt(t *testing.T) {
255255 },
256256 },
257257 },
258+ {
259+ name : "parsed keys with invalid characteres are sanitized" ,
260+ schema : arrow .NewSchema ([]arrow.Field {
261+ semconv .FieldFromFQN ("utf8.builtin.message" , true ),
262+ }, nil ),
263+ input : arrowtest.Rows {
264+ {colMsg : "level=info status=200 index-store=1234" },
265+ },
266+ requestedKeys : nil ,
267+ expectedFields : 4 , // 4 columns: message, level, status, index-store
268+ expectedOutput : arrowtest.Rows {
269+ {
270+ colMsg : "level=info status=200 index-store=1234" ,
271+ "utf8.parsed.level" : "info" ,
272+ "utf8.parsed.status" : "200" ,
273+ "utf8.parsed.index_store" : "1234" ,
274+ },
275+ },
276+ },
258277 } {
259278 t .Run (tt .name , func (t * testing.T ) {
260279 // Create input data with message column containing logfmt
@@ -660,6 +679,23 @@ func TestNewParsePipeline_JSON(t *testing.T) {
660679 },
661680 },
662681 },
682+ {
683+ name : "parsed keys with invalid characteres are sanitized" ,
684+ schema : arrow .NewSchema ([]arrow.Field {
685+ semconv .FieldFromFQN ("utf8.builtin.message" , true ),
686+ }, nil ),
687+ input : arrowtest.Rows {
688+ {colMsg : `{"index-store": "foo"}` },
689+ },
690+ requestedKeys : nil ,
691+ expectedFields : 2 , // 2 columns: message, index-store
692+ expectedOutput : arrowtest.Rows {
693+ {
694+ colMsg : `{"index-store": "foo"}` ,
695+ "utf8.parsed.index_store" : "foo" ,
696+ },
697+ },
698+ },
663699 } {
664700 t .Run (tt .name , func (t * testing.T ) {
665701 // Create input data with message column containing JSON
0 commit comments