diff options
Diffstat (limited to 'wordcount/src')
-rw-r--r-- | wordcount/src/bolts/__init__.py | 0 | ||||
-rw-r--r-- | wordcount/src/bolts/wordcount.py | 26 | ||||
-rw-r--r-- | wordcount/src/spouts/__init__.py | 0 | ||||
-rw-r--r-- | wordcount/src/spouts/words.py | 14 |
4 files changed, 0 insertions, 40 deletions
diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/wordcount/src/bolts/__init__.py +++ /dev/null diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py deleted file mode 100644 index 85fbfb7..0000000 --- a/wordcount/src/bolts/wordcount.py +++ /dev/null @@ -1,26 +0,0 @@ -import os -from collections import Counter - -from streamparse import Bolt - - -class WordCountBolt(Bolt): - outputs = ["word", "count"] - - def initialize(self, conf, ctx): - self.counter = Counter() - self.pid = os.getpid() - self.total = 0 - - def _increment(self, word, inc_by): - self.counter[word] += inc_by - self.total += inc_by - - def process(self, tup): - word = tup.values[0] - self._increment(word, 10 if word == "dog" else 1) - if self.total % 1000 == 0: - self.logger.info( - f"counted [{self.total:,}] words [pid={self.pid}]" - ) - self.emit([word, self.counter[word]]) diff --git a/wordcount/src/spouts/__init__.py b/wordcount/src/spouts/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/wordcount/src/spouts/__init__.py +++ /dev/null diff --git a/wordcount/src/spouts/words.py b/wordcount/src/spouts/words.py deleted file mode 100644 index 6ba88c1..0000000 --- a/wordcount/src/spouts/words.py +++ /dev/null @@ -1,14 +0,0 @@ -from itertools import cycle - -from streamparse import Spout - - -class WordSpout(Spout): - outputs = ["word"] - - def initialize(self, stormconf, context): - self.words = cycle(["dog", "cat", "zebra", "elephant"]) - - def next_tuple(self): - word = next(self.words) - self.emit([word]) |