aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCody Hiar <cody@hiar.ca>2021-10-10 11:13:47 -0600
committerCody Hiar <cody@hiar.ca>2021-10-10 11:13:47 -0600
commit5367edb8b37a6dac76ac7cd608ac3c005b7b225e (patch)
treeff2a381fba100dc1c7961e625f372f63e2998dcd
Work in progress
-rw-r--r--.gitignore1
-rw-r--r--Makefile19
-rw-r--r--README.md30
-rw-r--r--docker-compose.yml47
-rw-r--r--docker/Dockerfile26
-rw-r--r--wordcount/.gitignore4
-rw-r--r--wordcount/config.json20
-rw-r--r--wordcount/fabfile.py11
-rw-r--r--wordcount/project.clj13
-rw-r--r--wordcount/src/bolts/__init__.py0
-rw-r--r--wordcount/src/bolts/wordcount.py26
-rw-r--r--wordcount/src/spouts/__init__.py0
-rw-r--r--wordcount/src/spouts/words.py14
-rw-r--r--wordcount/topologies/wordcount.py13
-rw-r--r--wordcount/virtualenvs/wordcount.txt1
15 files changed, 225 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..bee8a64
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+__pycache__
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..ea21190
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,19 @@
+.PHONY: build
+
+help:
+ @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
+
+build: ## Build the Docker image
+ docker-compose -p docker_storm build
+
+up: build ## Bring the container up
+ docker-compose -p docker_storm up -d
+
+down: ## Stop the container
+ docker-compose -p docker_storm stop
+
+enter: ## Enter the running container
+ docker-compose -p docker_storm exec backend /bin/bash
+
+clean: down ## Remove stoped containers
+ docker-compose -p docker_storm rm -f
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..b84e6e4
--- /dev/null
+++ b/README.md
@@ -0,0 +1,30 @@
+# Dockerized Apache Storm
+
+Setting up a sample project with Apache Storm using stream parse. The
+`wordcount` project was created with `sparse quickstart wordcount` but it
+required some modifications to get running.
+
+## https://github.com/Parsely/streamparse/issues/479
+
+I don't know if this is the same error that was messing with me but I couldn't
+get the sample `wordcount` project to run when I installed storm `2.3.0` or
+`2.2.0`. Things started to work once I downgraded to `2.1.0`.
+
+## Upating dependancies
+
+```
+ :dependencies [[org.apache.storm/storm-core "2.1.0"]
+ [org.apache.storm/flux-core "2.1.0"]]
+```
+Updated these versions to match my local storm version
+
+## https://github.com/Parsely/streamparse/issues/472
+
+```
+(require 'cemerick.pomegranate.aether)
+(cemerick.pomegranate.aether/register-wagon-factory!
+ "http" #(org.apache.maven.wagon.providers.http.HttpWagon.))
+```
+
+Adding the snippet above to my project.clj seemed to fix this issue
+
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..67f7f6e
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,47 @@
+version: '3'
+services:
+ zookeeper:
+ image: zookeeper
+ container_name: zookeeper
+ restart: always
+
+ nimbus:
+ image: storm
+ container_name: nimbus
+ command: storm nimbus
+ depends_on:
+ - zookeeper
+ links:
+ - zookeeper
+ volumes:
+ - .:/usr/src/app
+ restart: always
+ ports:
+ - 6627:6627
+
+ supervisor:
+ image: storm
+ container_name: supervisor
+ command: storm supervisor
+ depends_on:
+ - nimbus
+ - zookeeper
+ links:
+ - nimbus
+ - zookeeper
+ restart: always
+
+ backend:
+ build:
+ context: ./
+ dockerfile: ./docker/Dockerfile
+ network_mode: "bridge"
+ container_name: docker_storm
+ image: thornycrackers/docker_storm
+ ports:
+ - "8000"
+ volumes:
+ - .:/usr/src/app
+ command: /bin/bash
+ tty: true
+ stdin_open: true
diff --git a/docker/Dockerfile b/docker/Dockerfile
new file mode 100644
index 0000000..c7ba950
--- /dev/null
+++ b/docker/Dockerfile
@@ -0,0 +1,26 @@
+FROM python:3.6-bullseye
+
+RUN apt-get update && \
+ apt-get install --no-install-recommends -y \
+ openjdk-11-jdk \
+ leiningen && \
+ rm -rf /var/lib/apt/lists/*
+
+RUN wget https://dlcdn.apache.org/storm/apache-storm-2.1.0/apache-storm-2.1.0.zip && \
+ unzip apache-storm-2.1.0.zip && \
+ mv apache-storm-2.1.0/bin/* /usr/lib/jvm/java-11-openjdk-amd64/bin && \
+ mv apache-storm-2.1.0/lib/* /usr/lib/jvm/java-11-openjdk-amd64/lib && \
+ rm -rf apache-storm-2.1.0
+
+RUN pip install streamparse
+
+ENV LEIN_ROOT=/usr/src/app
+
+# I don't know java best practices but this result in me being able to run things
+# so *shrug*
+ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
+RUN ln -s /usr/lib/jvm/java-11-openjdk-amd64/bin/storm /bin/storm
+
+
+# The code is stored here
+WORKDIR /usr/src/app
diff --git a/wordcount/.gitignore b/wordcount/.gitignore
new file mode 100644
index 0000000..37b28cc
--- /dev/null
+++ b/wordcount/.gitignore
@@ -0,0 +1,4 @@
+.DS_Store
+_build
+_resources
+logs
diff --git a/wordcount/config.json b/wordcount/config.json
new file mode 100644
index 0000000..778cb51
--- /dev/null
+++ b/wordcount/config.json
@@ -0,0 +1,20 @@
+{
+ "serializer": "json",
+ "topology_specs": "topologies/",
+ "virtualenv_specs": "virtualenvs/",
+ "envs": {
+ "prod": {
+ "user": "",
+ "ssh_password": "",
+ "nimbus": "",
+ "workers": [],
+ "log": {
+ "path": "",
+ "max_bytes": 1000000,
+ "backup_count": 10,
+ "level": "info"
+ },
+ "virtualenv_root": ""
+ }
+ }
+}
diff --git a/wordcount/fabfile.py b/wordcount/fabfile.py
new file mode 100644
index 0000000..8c89cbc
--- /dev/null
+++ b/wordcount/fabfile.py
@@ -0,0 +1,11 @@
+def pre_submit(topology_name, env_name, env_config, options):
+ """Override this function to perform custom actions prior to topology
+ submission. No SSH tunnels will be active when this function is called."""
+ pass
+
+
+def post_submit(topo_name, env_name, env_config, options):
+ """Override this function to perform custom actions after topology
+ submission. Note that the SSH tunnel to Nimbus will still be active
+ when this function is called."""
+ pass
diff --git a/wordcount/project.clj b/wordcount/project.clj
new file mode 100644
index 0000000..4ca974a
--- /dev/null
+++ b/wordcount/project.clj
@@ -0,0 +1,13 @@
+(defproject wordcount "0.0.1-SNAPSHOT"
+ :resource-paths ["_resources"]
+ :target-path "_build"
+ :min-lein-version "2.0.0"
+ :jvm-opts ["-client"]
+ :dependencies [[org.apache.storm/storm-core "2.1.0"]
+ [org.apache.storm/flux-core "2.1.0"]]
+ :jar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
+ :uberjar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
+ )
+(require 'cemerick.pomegranate.aether)
+(cemerick.pomegranate.aether/register-wagon-factory!
+ "http" #(org.apache.maven.wagon.providers.http.HttpWagon.))
diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wordcount/src/bolts/__init__.py
diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py
new file mode 100644
index 0000000..85fbfb7
--- /dev/null
+++ b/wordcount/src/bolts/wordcount.py
@@ -0,0 +1,26 @@
+import os
+from collections import Counter
+
+from streamparse import Bolt
+
+
+class WordCountBolt(Bolt):
+ outputs = ["word", "count"]
+
+ def initialize(self, conf, ctx):
+ self.counter = Counter()
+ self.pid = os.getpid()
+ self.total = 0
+
+ def _increment(self, word, inc_by):
+ self.counter[word] += inc_by
+ self.total += inc_by
+
+ def process(self, tup):
+ word = tup.values[0]
+ self._increment(word, 10 if word == "dog" else 1)
+ if self.total % 1000 == 0:
+ self.logger.info(
+ f"counted [{self.total:,}] words [pid={self.pid}]"
+ )
+ self.emit([word, self.counter[word]])
diff --git a/wordcount/src/spouts/__init__.py b/wordcount/src/spouts/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wordcount/src/spouts/__init__.py
diff --git a/wordcount/src/spouts/words.py b/wordcount/src/spouts/words.py
new file mode 100644
index 0000000..6ba88c1
--- /dev/null
+++ b/wordcount/src/spouts/words.py
@@ -0,0 +1,14 @@
+from itertools import cycle
+
+from streamparse import Spout
+
+
+class WordSpout(Spout):
+ outputs = ["word"]
+
+ def initialize(self, stormconf, context):
+ self.words = cycle(["dog", "cat", "zebra", "elephant"])
+
+ def next_tuple(self):
+ word = next(self.words)
+ self.emit([word])
diff --git a/wordcount/topologies/wordcount.py b/wordcount/topologies/wordcount.py
new file mode 100644
index 0000000..d66e1b3
--- /dev/null
+++ b/wordcount/topologies/wordcount.py
@@ -0,0 +1,13 @@
+"""
+Word count topology
+"""
+
+from streamparse import Grouping, Topology
+
+from bolts.wordcount import WordCountBolt
+from spouts.words import WordSpout
+
+
+class WordCount(Topology):
+ word_spout = WordSpout.spec()
+ count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, par=2)
diff --git a/wordcount/virtualenvs/wordcount.txt b/wordcount/virtualenvs/wordcount.txt
new file mode 100644
index 0000000..9229811
--- /dev/null
+++ b/wordcount/virtualenvs/wordcount.txt
@@ -0,0 +1 @@
+streamparse # always required for streamparse projects