aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCody Hiar <cody@hiar.ca>2022-05-09 13:50:59 -0600
committerCody Hiar <cody@hiar.ca>2022-05-09 13:50:59 -0600
commit01e5310b3d82591f813f2933dcd48633da386939 (patch)
tree54de2907b5ec427848620cd5ae8ef119dcb90495
Initial CommitHEADmaster
-rw-r--r--.gitignore12
-rw-r--r--Makefile19
-rw-r--r--README.md14
-rw-r--r--data_generator.py15
-rw-r--r--docker-compose.yml37
-rw-r--r--docker/Dockerfile7
-rw-r--r--hello_world.py15
7 files changed, 119 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..696c85c
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,12 @@
+.*.swp
+*.pyc
+*.pyo
+.DS_Store
+tags
+.ropeproject
+*.actual
+.vimcache
+.idea
+.mypy_cache
+.envrc
+*.sqlite
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..062df78
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,19 @@
+.PHONY: build
+
+help:
+ @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
+
+build: ## Build the Docker image
+ docker-compose -p docker_faust build
+
+up: build ## Bring the container up
+ docker-compose -p docker_faust up -d
+
+down: ## Stop the container
+ docker-compose -p docker_faust stop
+
+enter: ## Enter the running container
+ docker-compose -p docker_faust exec backend /bin/bash
+
+clean: down ## Remove stoped containers
+ docker-compose -p docker_faust rm -f
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..bc51738
--- /dev/null
+++ b/README.md
@@ -0,0 +1,14 @@
+# How to run
+
+Small sample project using faust to read from kafka
+
+```
+# Bring project up
+make up
+# In terminal 1, generator fake data
+make enter
+python data_generator.py
+# In terminal 2, read from topic
+make enter
+faust -A hello_world worker -l info
+```
diff --git a/data_generator.py b/data_generator.py
new file mode 100644
index 0000000..7b787cc
--- /dev/null
+++ b/data_generator.py
@@ -0,0 +1,15 @@
+"""Generate Fake data for kafka."""
+from time import sleep
+from json import dumps
+from kafka import KafkaProducer
+
+producer = KafkaProducer(
+ bootstrap_servers=['kafka:9092'],
+ value_serializer=lambda x: dumps(x).encode('utf-8')
+)
+
+for j in range(9999):
+ print("Iteration", j)
+ data = {'counter': j}
+ producer.send('topic_test', value=data)
+ sleep(0.5)
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..1136f96
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,37 @@
+version: '3'
+services:
+ zookeeper:
+ image: 'bitnami/zookeeper:latest'
+ ports:
+ - '2181:2181'
+ environment:
+ - ALLOW_ANONYMOUS_LOGIN=yes
+ kafka:
+ image: 'bitnami/kafka:latest'
+ ports:
+ - '9093:9093'
+ environment:
+ - KAFKA_BROKER_ID=1
+ - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
+ - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
+ - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
+ - ALLOW_PLAINTEXT_LISTENER=yes
+ - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
+ - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
+ - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
+ - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
+ depends_on:
+ - zookeeper
+ backend:
+ build:
+ context: ./
+ dockerfile: ./docker/Dockerfile
+ container_name: docker_faust
+ image: thornycrackers/docker_faust
+ links:
+ - kafka
+ volumes:
+ - .:/usr/src/app
+ command: /bin/bash
+ tty: true
+ stdin_open: true
diff --git a/docker/Dockerfile b/docker/Dockerfile
new file mode 100644
index 0000000..cfdcc3e
--- /dev/null
+++ b/docker/Dockerfile
@@ -0,0 +1,7 @@
+FROM python:3.9
+
+# Install faust
+RUN pip install faust==1.10.4 kafka-python==1.4.7
+
+# The code is stored here
+WORKDIR /usr/src/app
diff --git a/hello_world.py b/hello_world.py
new file mode 100644
index 0000000..bf91499
--- /dev/null
+++ b/hello_world.py
@@ -0,0 +1,15 @@
+"""Use Faust to read data."""
+import faust
+
+app = faust.App(
+ 'hello-world',
+ broker='kafka://kafka:9092',
+ value_serializer='raw',
+)
+
+test_topic = app.topic('topic_test')
+
+@app.agent(test_topic)
+async def greet(counters):
+ async for counter in counters:
+ print(counter)