aboutsummaryrefslogtreecommitdiff
path: root/wordcount
diff options
context:
space:
mode:
Diffstat (limited to 'wordcount')
-rw-r--r--wordcount/.gitignore4
-rw-r--r--wordcount/config.json20
-rw-r--r--wordcount/fabfile.py11
-rw-r--r--wordcount/project.clj13
-rw-r--r--wordcount/src/bolts/__init__.py0
-rw-r--r--wordcount/src/bolts/wordcount.py26
-rw-r--r--wordcount/src/spouts/__init__.py0
-rw-r--r--wordcount/src/spouts/words.py14
-rw-r--r--wordcount/topologies/wordcount.py13
-rw-r--r--wordcount/virtualenvs/wordcount.txt1
10 files changed, 102 insertions, 0 deletions
diff --git a/wordcount/.gitignore b/wordcount/.gitignore
new file mode 100644
index 0000000..37b28cc
--- /dev/null
+++ b/wordcount/.gitignore
@@ -0,0 +1,4 @@
+.DS_Store
+_build
+_resources
+logs
diff --git a/wordcount/config.json b/wordcount/config.json
new file mode 100644
index 0000000..778cb51
--- /dev/null
+++ b/wordcount/config.json
@@ -0,0 +1,20 @@
+{
+ "serializer": "json",
+ "topology_specs": "topologies/",
+ "virtualenv_specs": "virtualenvs/",
+ "envs": {
+ "prod": {
+ "user": "",
+ "ssh_password": "",
+ "nimbus": "",
+ "workers": [],
+ "log": {
+ "path": "",
+ "max_bytes": 1000000,
+ "backup_count": 10,
+ "level": "info"
+ },
+ "virtualenv_root": ""
+ }
+ }
+}
diff --git a/wordcount/fabfile.py b/wordcount/fabfile.py
new file mode 100644
index 0000000..8c89cbc
--- /dev/null
+++ b/wordcount/fabfile.py
@@ -0,0 +1,11 @@
+def pre_submit(topology_name, env_name, env_config, options):
+ """Override this function to perform custom actions prior to topology
+ submission. No SSH tunnels will be active when this function is called."""
+ pass
+
+
+def post_submit(topo_name, env_name, env_config, options):
+ """Override this function to perform custom actions after topology
+ submission. Note that the SSH tunnel to Nimbus will still be active
+ when this function is called."""
+ pass
diff --git a/wordcount/project.clj b/wordcount/project.clj
new file mode 100644
index 0000000..4ca974a
--- /dev/null
+++ b/wordcount/project.clj
@@ -0,0 +1,13 @@
+(defproject wordcount "0.0.1-SNAPSHOT"
+ :resource-paths ["_resources"]
+ :target-path "_build"
+ :min-lein-version "2.0.0"
+ :jvm-opts ["-client"]
+ :dependencies [[org.apache.storm/storm-core "2.1.0"]
+ [org.apache.storm/flux-core "2.1.0"]]
+ :jar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
+ :uberjar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
+ )
+(require 'cemerick.pomegranate.aether)
+(cemerick.pomegranate.aether/register-wagon-factory!
+ "http" #(org.apache.maven.wagon.providers.http.HttpWagon.))
diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wordcount/src/bolts/__init__.py
diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py
new file mode 100644
index 0000000..85fbfb7
--- /dev/null
+++ b/wordcount/src/bolts/wordcount.py
@@ -0,0 +1,26 @@
+import os
+from collections import Counter
+
+from streamparse import Bolt
+
+
+class WordCountBolt(Bolt):
+ outputs = ["word", "count"]
+
+ def initialize(self, conf, ctx):
+ self.counter = Counter()
+ self.pid = os.getpid()
+ self.total = 0
+
+ def _increment(self, word, inc_by):
+ self.counter[word] += inc_by
+ self.total += inc_by
+
+ def process(self, tup):
+ word = tup.values[0]
+ self._increment(word, 10 if word == "dog" else 1)
+ if self.total % 1000 == 0:
+ self.logger.info(
+ f"counted [{self.total:,}] words [pid={self.pid}]"
+ )
+ self.emit([word, self.counter[word]])
diff --git a/wordcount/src/spouts/__init__.py b/wordcount/src/spouts/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wordcount/src/spouts/__init__.py
diff --git a/wordcount/src/spouts/words.py b/wordcount/src/spouts/words.py
new file mode 100644
index 0000000..6ba88c1
--- /dev/null
+++ b/wordcount/src/spouts/words.py
@@ -0,0 +1,14 @@
+from itertools import cycle
+
+from streamparse import Spout
+
+
+class WordSpout(Spout):
+ outputs = ["word"]
+
+ def initialize(self, stormconf, context):
+ self.words = cycle(["dog", "cat", "zebra", "elephant"])
+
+ def next_tuple(self):
+ word = next(self.words)
+ self.emit([word])
diff --git a/wordcount/topologies/wordcount.py b/wordcount/topologies/wordcount.py
new file mode 100644
index 0000000..d66e1b3
--- /dev/null
+++ b/wordcount/topologies/wordcount.py
@@ -0,0 +1,13 @@
+"""
+Word count topology
+"""
+
+from streamparse import Grouping, Topology
+
+from bolts.wordcount import WordCountBolt
+from spouts.words import WordSpout
+
+
+class WordCount(Topology):
+ word_spout = WordSpout.spec()
+ count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, par=2)
diff --git a/wordcount/virtualenvs/wordcount.txt b/wordcount/virtualenvs/wordcount.txt
new file mode 100644
index 0000000..9229811
--- /dev/null
+++ b/wordcount/virtualenvs/wordcount.txt
@@ -0,0 +1 @@
+streamparse # always required for streamparse projects