aboutsummaryrefslogtreecommitdiff
path: root/wordcount/src/bolts/wordcount.py
blob: 85fbfb7580889de846c9eaa5b0bdc748dbd3f2b7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import os
from collections import Counter

from streamparse import Bolt


class WordCountBolt(Bolt):
    outputs = ["word", "count"]

    def initialize(self, conf, ctx):
        self.counter = Counter()
        self.pid = os.getpid()
        self.total = 0

    def _increment(self, word, inc_by):
        self.counter[word] += inc_by
        self.total += inc_by

    def process(self, tup):
        word = tup.values[0]
        self._increment(word, 10 if word == "dog" else 1)
        if self.total % 1000 == 0:
            self.logger.info(
                f"counted [{self.total:,}] words [pid={self.pid}]"
            )
        self.emit([word, self.counter[word]])