From 01e5310b3d82591f813f2933dcd48633da386939 Mon Sep 17 00:00:00 2001 From: Cody Hiar Date: Mon, 9 May 2022 13:50:59 -0600 Subject: Initial Commit --- .gitignore | 12 ++++++++++++ Makefile | 19 +++++++++++++++++++ README.md | 14 ++++++++++++++ data_generator.py | 15 +++++++++++++++ docker-compose.yml | 37 +++++++++++++++++++++++++++++++++++++ docker/Dockerfile | 7 +++++++ hello_world.py | 15 +++++++++++++++ 7 files changed, 119 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 README.md create mode 100644 data_generator.py create mode 100644 docker-compose.yml create mode 100644 docker/Dockerfile create mode 100644 hello_world.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..696c85c --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +.*.swp +*.pyc +*.pyo +.DS_Store +tags +.ropeproject +*.actual +.vimcache +.idea +.mypy_cache +.envrc +*.sqlite diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..062df78 --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +.PHONY: build + +help: + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' + +build: ## Build the Docker image + docker-compose -p docker_faust build + +up: build ## Bring the container up + docker-compose -p docker_faust up -d + +down: ## Stop the container + docker-compose -p docker_faust stop + +enter: ## Enter the running container + docker-compose -p docker_faust exec backend /bin/bash + +clean: down ## Remove stoped containers + docker-compose -p docker_faust rm -f diff --git a/README.md b/README.md new file mode 100644 index 0000000..bc51738 --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +# How to run + +Small sample project using faust to read from kafka + +``` +# Bring project up +make up +# In terminal 1, generator fake data +make enter +python data_generator.py +# In terminal 2, read from topic +make enter +faust -A hello_world worker -l info +``` diff --git a/data_generator.py b/data_generator.py new file mode 100644 index 0000000..7b787cc --- /dev/null +++ b/data_generator.py @@ -0,0 +1,15 @@ +"""Generate Fake data for kafka.""" +from time import sleep +from json import dumps +from kafka import KafkaProducer + +producer = KafkaProducer( + bootstrap_servers=['kafka:9092'], + value_serializer=lambda x: dumps(x).encode('utf-8') +) + +for j in range(9999): + print("Iteration", j) + data = {'counter': j} + producer.send('topic_test', value=data) + sleep(0.5) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..1136f96 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,37 @@ +version: '3' +services: + zookeeper: + image: 'bitnami/zookeeper:latest' + ports: + - '2181:2181' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + kafka: + image: 'bitnami/kafka:latest' + ports: + - '9093:9093' + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT + - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093 + - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT + depends_on: + - zookeeper + backend: + build: + context: ./ + dockerfile: ./docker/Dockerfile + container_name: docker_faust + image: thornycrackers/docker_faust + links: + - kafka + volumes: + - .:/usr/src/app + command: /bin/bash + tty: true + stdin_open: true diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..cfdcc3e --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.9 + +# Install faust +RUN pip install faust==1.10.4 kafka-python==1.4.7 + +# The code is stored here +WORKDIR /usr/src/app diff --git a/hello_world.py b/hello_world.py new file mode 100644 index 0000000..bf91499 --- /dev/null +++ b/hello_world.py @@ -0,0 +1,15 @@ +"""Use Faust to read data.""" +import faust + +app = faust.App( + 'hello-world', + broker='kafka://kafka:9092', + value_serializer='raw', +) + +test_topic = app.topic('topic_test') + +@app.agent(test_topic) +async def greet(counters): + async for counter in counters: + print(counter) -- cgit v1.2.3