aboutsummaryrefslogtreecommitdiff
path: root/wordcount/multilang/resources/countbolt.py
blob: 4aebc1981cdb010b31746cc9a3588a81473f8601 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import storm
# Counter is a nice way to count things,
# but it is a Python 2.7 thing
from collections import Counter

class CountBolt(storm.BasicBolt):
    # Initialize this instance
    def initialize(self, conf, context):
        self._conf = conf
        self._context = context
        # Create a new counter for this instance
        self._counter = Counter()
        storm.logInfo("Counter bolt instance starting...")

    def process(self, tup):
        # Get the word from the inbound tuple
        word = tup.values[0]
        # Increment the counter
        self._counter[word] +=1
        count = self._counter[word]
        storm.logInfo("Emitting %s:%s" % (word, count))
        # Emit the word and count
        storm.emit([word, count])

# Start the bolt when it's invoked
CountBolt().run()