aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
blob: 955aba238e75be0ae4c94f6ca88d60d80ff349d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from kafka import KafkaConsumer
from json import loads
from time import sleep

consumer = KafkaConsumer(
    'topic_test',
    bootstrap_servers=['kafka:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group-id',
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)

for event in consumer:
    event_data = event.value
    # Do whatever you want
    print(event_data)
    sleep(2)