aboutsummaryrefslogtreecommitdiff
path: root/wordcount/src
diff options
context:
space:
mode:
Diffstat (limited to 'wordcount/src')
-rw-r--r--wordcount/src/bolts/__init__.py0
-rw-r--r--wordcount/src/bolts/wordcount.py26
-rw-r--r--wordcount/src/spouts/__init__.py0
-rw-r--r--wordcount/src/spouts/words.py14
4 files changed, 40 insertions, 0 deletions
diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wordcount/src/bolts/__init__.py
diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py
new file mode 100644
index 0000000..85fbfb7
--- /dev/null
+++ b/wordcount/src/bolts/wordcount.py
@@ -0,0 +1,26 @@
+import os
+from collections import Counter
+
+from streamparse import Bolt
+
+
+class WordCountBolt(Bolt):
+ outputs = ["word", "count"]
+
+ def initialize(self, conf, ctx):
+ self.counter = Counter()
+ self.pid = os.getpid()
+ self.total = 0
+
+ def _increment(self, word, inc_by):
+ self.counter[word] += inc_by
+ self.total += inc_by
+
+ def process(self, tup):
+ word = tup.values[0]
+ self._increment(word, 10 if word == "dog" else 1)
+ if self.total % 1000 == 0:
+ self.logger.info(
+ f"counted [{self.total:,}] words [pid={self.pid}]"
+ )
+ self.emit([word, self.counter[word]])
diff --git a/wordcount/src/spouts/__init__.py b/wordcount/src/spouts/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wordcount/src/spouts/__init__.py
diff --git a/wordcount/src/spouts/words.py b/wordcount/src/spouts/words.py
new file mode 100644
index 0000000..6ba88c1
--- /dev/null
+++ b/wordcount/src/spouts/words.py
@@ -0,0 +1,14 @@
+from itertools import cycle
+
+from streamparse import Spout
+
+
+class WordSpout(Spout):
+ outputs = ["word"]
+
+ def initialize(self, stormconf, context):
+ self.words = cycle(["dog", "cat", "zebra", "elephant"])
+
+ def next_tuple(self):
+ word = next(self.words)
+ self.emit([word])