From 3908bbe94637fee87bf250e2f2f210b3b2724c90 Mon Sep 17 00:00:00 2001 From: Cody Hiar Date: Fri, 8 Oct 2021 19:47:23 -0600 Subject: initial commit --- consumer.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 consumer.py (limited to 'consumer.py') diff --git a/consumer.py b/consumer.py new file mode 100644 index 0000000..955aba2 --- /dev/null +++ b/consumer.py @@ -0,0 +1,18 @@ +from kafka import KafkaConsumer +from json import loads +from time import sleep + +consumer = KafkaConsumer( + 'topic_test', + bootstrap_servers=['kafka:9092'], + auto_offset_reset='earliest', + enable_auto_commit=True, + group_id='my-group-id', + value_deserializer=lambda x: loads(x.decode('utf-8')) +) + +for event in consumer: + event_data = event.value + # Do whatever you want + print(event_data) + sleep(2) -- cgit v1.2.3