diff options
Diffstat (limited to 'wordcount')
-rw-r--r-- | wordcount/.gitignore | 4 | ||||
-rw-r--r-- | wordcount/config.json | 20 | ||||
-rw-r--r-- | wordcount/fabfile.py | 11 | ||||
-rw-r--r-- | wordcount/project.clj | 13 | ||||
-rw-r--r-- | wordcount/src/bolts/__init__.py | 0 | ||||
-rw-r--r-- | wordcount/src/bolts/wordcount.py | 26 | ||||
-rw-r--r-- | wordcount/src/spouts/__init__.py | 0 | ||||
-rw-r--r-- | wordcount/src/spouts/words.py | 14 | ||||
-rw-r--r-- | wordcount/topologies/wordcount.py | 13 | ||||
-rw-r--r-- | wordcount/virtualenvs/wordcount.txt | 1 |
10 files changed, 102 insertions, 0 deletions
diff --git a/wordcount/.gitignore b/wordcount/.gitignore new file mode 100644 index 0000000..37b28cc --- /dev/null +++ b/wordcount/.gitignore @@ -0,0 +1,4 @@ +.DS_Store +_build +_resources +logs diff --git a/wordcount/config.json b/wordcount/config.json new file mode 100644 index 0000000..778cb51 --- /dev/null +++ b/wordcount/config.json @@ -0,0 +1,20 @@ +{ + "serializer": "json", + "topology_specs": "topologies/", + "virtualenv_specs": "virtualenvs/", + "envs": { + "prod": { + "user": "", + "ssh_password": "", + "nimbus": "", + "workers": [], + "log": { + "path": "", + "max_bytes": 1000000, + "backup_count": 10, + "level": "info" + }, + "virtualenv_root": "" + } + } +} diff --git a/wordcount/fabfile.py b/wordcount/fabfile.py new file mode 100644 index 0000000..8c89cbc --- /dev/null +++ b/wordcount/fabfile.py @@ -0,0 +1,11 @@ +def pre_submit(topology_name, env_name, env_config, options): + """Override this function to perform custom actions prior to topology + submission. No SSH tunnels will be active when this function is called.""" + pass + + +def post_submit(topo_name, env_name, env_config, options): + """Override this function to perform custom actions after topology + submission. Note that the SSH tunnel to Nimbus will still be active + when this function is called.""" + pass diff --git a/wordcount/project.clj b/wordcount/project.clj new file mode 100644 index 0000000..4ca974a --- /dev/null +++ b/wordcount/project.clj @@ -0,0 +1,13 @@ +(defproject wordcount "0.0.1-SNAPSHOT" + :resource-paths ["_resources"] + :target-path "_build" + :min-lein-version "2.0.0" + :jvm-opts ["-client"] + :dependencies [[org.apache.storm/storm-core "2.1.0"] + [org.apache.storm/flux-core "2.1.0"]] + :jar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"] + :uberjar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"] + ) +(require 'cemerick.pomegranate.aether) +(cemerick.pomegranate.aether/register-wagon-factory! + "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/wordcount/src/bolts/__init__.py diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py new file mode 100644 index 0000000..85fbfb7 --- /dev/null +++ b/wordcount/src/bolts/wordcount.py @@ -0,0 +1,26 @@ +import os +from collections import Counter + +from streamparse import Bolt + + +class WordCountBolt(Bolt): + outputs = ["word", "count"] + + def initialize(self, conf, ctx): + self.counter = Counter() + self.pid = os.getpid() + self.total = 0 + + def _increment(self, word, inc_by): + self.counter[word] += inc_by + self.total += inc_by + + def process(self, tup): + word = tup.values[0] + self._increment(word, 10 if word == "dog" else 1) + if self.total % 1000 == 0: + self.logger.info( + f"counted [{self.total:,}] words [pid={self.pid}]" + ) + self.emit([word, self.counter[word]]) diff --git a/wordcount/src/spouts/__init__.py b/wordcount/src/spouts/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/wordcount/src/spouts/__init__.py diff --git a/wordcount/src/spouts/words.py b/wordcount/src/spouts/words.py new file mode 100644 index 0000000..6ba88c1 --- /dev/null +++ b/wordcount/src/spouts/words.py @@ -0,0 +1,14 @@ +from itertools import cycle + +from streamparse import Spout + + +class WordSpout(Spout): + outputs = ["word"] + + def initialize(self, stormconf, context): + self.words = cycle(["dog", "cat", "zebra", "elephant"]) + + def next_tuple(self): + word = next(self.words) + self.emit([word]) diff --git a/wordcount/topologies/wordcount.py b/wordcount/topologies/wordcount.py new file mode 100644 index 0000000..d66e1b3 --- /dev/null +++ b/wordcount/topologies/wordcount.py @@ -0,0 +1,13 @@ +""" +Word count topology +""" + +from streamparse import Grouping, Topology + +from bolts.wordcount import WordCountBolt +from spouts.words import WordSpout + + +class WordCount(Topology): + word_spout = WordSpout.spec() + count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, par=2) diff --git a/wordcount/virtualenvs/wordcount.txt b/wordcount/virtualenvs/wordcount.txt new file mode 100644 index 0000000..9229811 --- /dev/null +++ b/wordcount/virtualenvs/wordcount.txt @@ -0,0 +1 @@ +streamparse # always required for streamparse projects |