From 3908bbe94637fee87bf250e2f2f210b3b2724c90 Mon Sep 17 00:00:00 2001 From: Cody Hiar Date: Fri, 8 Oct 2021 19:47:23 -0600 Subject: initial commit --- .gitignore | 12 ++++++++++++ Makefile | 19 +++++++++++++++++++ README.md | 5 +++++ consumer.py | 18 ++++++++++++++++++ docker-compose.yml | 37 +++++++++++++++++++++++++++++++++++++ docker/Dockerfile | 7 +++++++ producer.py | 14 ++++++++++++++ 7 files changed, 112 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 README.md create mode 100644 consumer.py create mode 100644 docker-compose.yml create mode 100644 docker/Dockerfile create mode 100644 producer.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..696c85c --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +.*.swp +*.pyc +*.pyo +.DS_Store +tags +.ropeproject +*.actual +.vimcache +.idea +.mypy_cache +.envrc +*.sqlite diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..5fde356 --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +.PHONY: build + +help: + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' + +build: ## Build the Docker image + docker-compose -p docker_kafka build + +up: build ## Bring the container up + docker-compose -p docker_kafka up -d + +down: ## Stop the container + docker-compose -p docker_kafka stop + +enter: ## Enter the running container + docker-compose -p docker_kafka exec backend /bin/bash + +clean: down ## Remove stoped containers + docker-compose -p docker_kafka rm -f diff --git a/README.md b/README.md new file mode 100644 index 0000000..e4712a2 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# Docker Kafka + +A hello world example of how to use Kafka with python. You can run the consumer +in one terminal with `python consumer.py` and the producer in another with +`python producer.py` and watch some counters increment. diff --git a/consumer.py b/consumer.py new file mode 100644 index 0000000..955aba2 --- /dev/null +++ b/consumer.py @@ -0,0 +1,18 @@ +from kafka import KafkaConsumer +from json import loads +from time import sleep + +consumer = KafkaConsumer( + 'topic_test', + bootstrap_servers=['kafka:9092'], + auto_offset_reset='earliest', + enable_auto_commit=True, + group_id='my-group-id', + value_deserializer=lambda x: loads(x.decode('utf-8')) +) + +for event in consumer: + event_data = event.value + # Do whatever you want + print(event_data) + sleep(2) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2131982 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,37 @@ +version: '3' +services: + zookeeper: + image: 'bitnami/zookeeper:latest' + ports: + - '2181:2181' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + kafka: + image: 'bitnami/kafka:latest' + ports: + - '9093:9093' + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT + - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093 + - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT + depends_on: + - zookeeper + backend: + build: + context: ./ + dockerfile: ./docker/Dockerfile + container_name: docker_kafka + image: thornycrackers/docker_kafka + links: + - kafka + volumes: + - .:/usr/src/app + command: /bin/bash + tty: true + stdin_open: true diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..423c52c --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.6-bullseye + +# Install kafka +RUN pip install kafka-python + +# The code is stored here +WORKDIR /usr/src/app diff --git a/producer.py b/producer.py new file mode 100644 index 0000000..2acae44 --- /dev/null +++ b/producer.py @@ -0,0 +1,14 @@ +from time import sleep +from json import dumps +from kafka import KafkaProducer + +producer = KafkaProducer( + bootstrap_servers=['kafka:9092'], + value_serializer=lambda x: dumps(x).encode('utf-8') +) + +for j in range(9999): + print("Iteration", j) + data = {'counter': j} + producer.send('topic_test', value=data) + sleep(0.5) -- cgit v1.2.3