Skip to content

Conversation

poorna2152
Copy link
Contributor

The following sample is not working in siddhi,
This gives a null pointer error because the default partition list is null inside kafka code and after fixing that even the message offset is not considered when passing the content to the sink.

@App:name("HelloKafka")

@App:description('Consume events from a Kafka Topic and log the messages on the console.')

@source(type='kafka',
        topic.list='productions',
        threading.option='single.thread',
        group.id="group1",
        bootstrap.servers='localhost:9092',
        partition.no.list='0',
        topic.offsets.map='productions=2',
        @map(type='json'))
define stream SweetProductionStream (name string, amount double);

@sink(type='log')
define stream OutputStream (name string, amount double);

from SweetProductionStream
select str:upper(name) as name, amount
insert into OutputStream;

@poorna2152 poorna2152 force-pushed the parition_list branch 2 times, most recently from 4410f23 to fb3db0f Compare March 15, 2025 10:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant