diff options
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | Makefile | 19 | ||||
-rw-r--r-- | README.md | 30 | ||||
-rw-r--r-- | docker-compose.yml | 47 | ||||
-rw-r--r-- | docker/Dockerfile | 26 | ||||
-rw-r--r-- | wordcount/.gitignore | 4 | ||||
-rw-r--r-- | wordcount/config.json | 20 | ||||
-rw-r--r-- | wordcount/fabfile.py | 11 | ||||
-rw-r--r-- | wordcount/project.clj | 13 | ||||
-rw-r--r-- | wordcount/src/bolts/__init__.py | 0 | ||||
-rw-r--r-- | wordcount/src/bolts/wordcount.py | 26 | ||||
-rw-r--r-- | wordcount/src/spouts/__init__.py | 0 | ||||
-rw-r--r-- | wordcount/src/spouts/words.py | 14 | ||||
-rw-r--r-- | wordcount/topologies/wordcount.py | 13 | ||||
-rw-r--r-- | wordcount/virtualenvs/wordcount.txt | 1 |
15 files changed, 225 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ea21190 --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +.PHONY: build + +help: + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' + +build: ## Build the Docker image + docker-compose -p docker_storm build + +up: build ## Bring the container up + docker-compose -p docker_storm up -d + +down: ## Stop the container + docker-compose -p docker_storm stop + +enter: ## Enter the running container + docker-compose -p docker_storm exec backend /bin/bash + +clean: down ## Remove stoped containers + docker-compose -p docker_storm rm -f diff --git a/README.md b/README.md new file mode 100644 index 0000000..b84e6e4 --- /dev/null +++ b/README.md @@ -0,0 +1,30 @@ +# Dockerized Apache Storm + +Setting up a sample project with Apache Storm using stream parse. The +`wordcount` project was created with `sparse quickstart wordcount` but it +required some modifications to get running. + +## https://github.com/Parsely/streamparse/issues/479 + +I don't know if this is the same error that was messing with me but I couldn't +get the sample `wordcount` project to run when I installed storm `2.3.0` or +`2.2.0`. Things started to work once I downgraded to `2.1.0`. + +## Upating dependancies + +``` + :dependencies [[org.apache.storm/storm-core "2.1.0"] + [org.apache.storm/flux-core "2.1.0"]] +``` +Updated these versions to match my local storm version + +## https://github.com/Parsely/streamparse/issues/472 + +``` +(require 'cemerick.pomegranate.aether) +(cemerick.pomegranate.aether/register-wagon-factory! + "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) +``` + +Adding the snippet above to my project.clj seemed to fix this issue + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..67f7f6e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,47 @@ +version: '3' +services: + zookeeper: + image: zookeeper + container_name: zookeeper + restart: always + + nimbus: + image: storm + container_name: nimbus + command: storm nimbus + depends_on: + - zookeeper + links: + - zookeeper + volumes: + - .:/usr/src/app + restart: always + ports: + - 6627:6627 + + supervisor: + image: storm + container_name: supervisor + command: storm supervisor + depends_on: + - nimbus + - zookeeper + links: + - nimbus + - zookeeper + restart: always + + backend: + build: + context: ./ + dockerfile: ./docker/Dockerfile + network_mode: "bridge" + container_name: docker_storm + image: thornycrackers/docker_storm + ports: + - "8000" + volumes: + - .:/usr/src/app + command: /bin/bash + tty: true + stdin_open: true diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..c7ba950 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.6-bullseye + +RUN apt-get update && \ + apt-get install --no-install-recommends -y \ + openjdk-11-jdk \ + leiningen && \ + rm -rf /var/lib/apt/lists/* + +RUN wget https://dlcdn.apache.org/storm/apache-storm-2.1.0/apache-storm-2.1.0.zip && \ + unzip apache-storm-2.1.0.zip && \ + mv apache-storm-2.1.0/bin/* /usr/lib/jvm/java-11-openjdk-amd64/bin && \ + mv apache-storm-2.1.0/lib/* /usr/lib/jvm/java-11-openjdk-amd64/lib && \ + rm -rf apache-storm-2.1.0 + +RUN pip install streamparse + +ENV LEIN_ROOT=/usr/src/app + +# I don't know java best practices but this result in me being able to run things +# so *shrug* +ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 +RUN ln -s /usr/lib/jvm/java-11-openjdk-amd64/bin/storm /bin/storm + + +# The code is stored here +WORKDIR /usr/src/app diff --git a/wordcount/.gitignore b/wordcount/.gitignore new file mode 100644 index 0000000..37b28cc --- /dev/null +++ b/wordcount/.gitignore @@ -0,0 +1,4 @@ +.DS_Store +_build +_resources +logs diff --git a/wordcount/config.json b/wordcount/config.json new file mode 100644 index 0000000..778cb51 --- /dev/null +++ b/wordcount/config.json @@ -0,0 +1,20 @@ +{ + "serializer": "json", + "topology_specs": "topologies/", + "virtualenv_specs": "virtualenvs/", + "envs": { + "prod": { + "user": "", + "ssh_password": "", + "nimbus": "", + "workers": [], + "log": { + "path": "", + "max_bytes": 1000000, + "backup_count": 10, + "level": "info" + }, + "virtualenv_root": "" + } + } +} diff --git a/wordcount/fabfile.py b/wordcount/fabfile.py new file mode 100644 index 0000000..8c89cbc --- /dev/null +++ b/wordcount/fabfile.py @@ -0,0 +1,11 @@ +def pre_submit(topology_name, env_name, env_config, options): + """Override this function to perform custom actions prior to topology + submission. No SSH tunnels will be active when this function is called.""" + pass + + +def post_submit(topo_name, env_name, env_config, options): + """Override this function to perform custom actions after topology + submission. Note that the SSH tunnel to Nimbus will still be active + when this function is called.""" + pass diff --git a/wordcount/project.clj b/wordcount/project.clj new file mode 100644 index 0000000..4ca974a --- /dev/null +++ b/wordcount/project.clj @@ -0,0 +1,13 @@ +(defproject wordcount "0.0.1-SNAPSHOT" + :resource-paths ["_resources"] + :target-path "_build" + :min-lein-version "2.0.0" + :jvm-opts ["-client"] + :dependencies [[org.apache.storm/storm-core "2.1.0"] + [org.apache.storm/flux-core "2.1.0"]] + :jar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"] + :uberjar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"] + ) +(require 'cemerick.pomegranate.aether) +(cemerick.pomegranate.aether/register-wagon-factory! + "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/wordcount/src/bolts/__init__.py diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py new file mode 100644 index 0000000..85fbfb7 --- /dev/null +++ b/wordcount/src/bolts/wordcount.py @@ -0,0 +1,26 @@ +import os +from collections import Counter + +from streamparse import Bolt + + +class WordCountBolt(Bolt): + outputs = ["word", "count"] + + def initialize(self, conf, ctx): + self.counter = Counter() + self.pid = os.getpid() + self.total = 0 + + def _increment(self, word, inc_by): + self.counter[word] += inc_by + self.total += inc_by + + def process(self, tup): + word = tup.values[0] + self._increment(word, 10 if word == "dog" else 1) + if self.total % 1000 == 0: + self.logger.info( + f"counted [{self.total:,}] words [pid={self.pid}]" + ) + self.emit([word, self.counter[word]]) diff --git a/wordcount/src/spouts/__init__.py b/wordcount/src/spouts/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/wordcount/src/spouts/__init__.py diff --git a/wordcount/src/spouts/words.py b/wordcount/src/spouts/words.py new file mode 100644 index 0000000..6ba88c1 --- /dev/null +++ b/wordcount/src/spouts/words.py @@ -0,0 +1,14 @@ +from itertools import cycle + +from streamparse import Spout + + +class WordSpout(Spout): + outputs = ["word"] + + def initialize(self, stormconf, context): + self.words = cycle(["dog", "cat", "zebra", "elephant"]) + + def next_tuple(self): + word = next(self.words) + self.emit([word]) diff --git a/wordcount/topologies/wordcount.py b/wordcount/topologies/wordcount.py new file mode 100644 index 0000000..d66e1b3 --- /dev/null +++ b/wordcount/topologies/wordcount.py @@ -0,0 +1,13 @@ +""" +Word count topology +""" + +from streamparse import Grouping, Topology + +from bolts.wordcount import WordCountBolt +from spouts.words import WordSpout + + +class WordCount(Topology): + word_spout = WordSpout.spec() + count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, par=2) diff --git a/wordcount/virtualenvs/wordcount.txt b/wordcount/virtualenvs/wordcount.txt new file mode 100644 index 0000000..9229811 --- /dev/null +++ b/wordcount/virtualenvs/wordcount.txt @@ -0,0 +1 @@ +streamparse # always required for streamparse projects |