aboutsummaryrefslogtreecommitdiff
path: root/wordcount/src
diff options
context:
space:
mode:
Diffstat (limited to 'wordcount/src')
-rw-r--r--wordcount/src/bolts/__init__.py0
-rw-r--r--wordcount/src/bolts/wordcount.py26
-rw-r--r--wordcount/src/spouts/__init__.py0
-rw-r--r--wordcount/src/spouts/words.py14
4 files changed, 0 insertions, 40 deletions
diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wordcount/src/bolts/__init__.py
+++ /dev/null
diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py
deleted file mode 100644
index 85fbfb7..0000000
--- a/wordcount/src/bolts/wordcount.py
+++ /dev/null
@@ -1,26 +0,0 @@
-import os
-from collections import Counter
-
-from streamparse import Bolt
-
-
-class WordCountBolt(Bolt):
- outputs = ["word", "count"]
-
- def initialize(self, conf, ctx):
- self.counter = Counter()
- self.pid = os.getpid()
- self.total = 0
-
- def _increment(self, word, inc_by):
- self.counter[word] += inc_by
- self.total += inc_by
-
- def process(self, tup):
- word = tup.values[0]
- self._increment(word, 10 if word == "dog" else 1)
- if self.total % 1000 == 0:
- self.logger.info(
- f"counted [{self.total:,}] words [pid={self.pid}]"
- )
- self.emit([word, self.counter[word]])
diff --git a/wordcount/src/spouts/__init__.py b/wordcount/src/spouts/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wordcount/src/spouts/__init__.py
+++ /dev/null
diff --git a/wordcount/src/spouts/words.py b/wordcount/src/spouts/words.py
deleted file mode 100644
index 6ba88c1..0000000
--- a/wordcount/src/spouts/words.py
+++ /dev/null
@@ -1,14 +0,0 @@
-from itertools import cycle
-
-from streamparse import Spout
-
-
-class WordSpout(Spout):
- outputs = ["word"]
-
- def initialize(self, stormconf, context):
- self.words = cycle(["dog", "cat", "zebra", "elephant"])
-
- def next_tuple(self):
- word = next(self.words)
- self.emit([word])