diff options
author | Cody Hiar <cody@hiar.ca> | 2021-10-10 11:13:47 -0600 |
---|---|---|
committer | Cody Hiar <cody@hiar.ca> | 2021-10-10 11:13:47 -0600 |
commit | 5367edb8b37a6dac76ac7cd608ac3c005b7b225e (patch) | |
tree | ff2a381fba100dc1c7961e625f372f63e2998dcd /wordcount/src/bolts |
Work in progress
Diffstat (limited to 'wordcount/src/bolts')
-rw-r--r-- | wordcount/src/bolts/__init__.py | 0 | ||||
-rw-r--r-- | wordcount/src/bolts/wordcount.py | 26 |
2 files changed, 26 insertions, 0 deletions
diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/wordcount/src/bolts/__init__.py diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py new file mode 100644 index 0000000..85fbfb7 --- /dev/null +++ b/wordcount/src/bolts/wordcount.py @@ -0,0 +1,26 @@ +import os +from collections import Counter + +from streamparse import Bolt + + +class WordCountBolt(Bolt): + outputs = ["word", "count"] + + def initialize(self, conf, ctx): + self.counter = Counter() + self.pid = os.getpid() + self.total = 0 + + def _increment(self, word, inc_by): + self.counter[word] += inc_by + self.total += inc_by + + def process(self, tup): + word = tup.values[0] + self._increment(word, 10 if word == "dog" else 1) + if self.total % 1000 == 0: + self.logger.info( + f"counted [{self.total:,}] words [pid={self.pid}]" + ) + self.emit([word, self.counter[word]]) |