blob: 4aebc1981cdb010b31746cc9a3588a81473f8601 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import storm
# Counter is a nice way to count things,
# but it is a Python 2.7 thing
from collections import Counter
class CountBolt(storm.BasicBolt):
# Initialize this instance
def initialize(self, conf, context):
self._conf = conf
self._context = context
# Create a new counter for this instance
self._counter = Counter()
storm.logInfo("Counter bolt instance starting...")
def process(self, tup):
# Get the word from the inbound tuple
word = tup.values[0]
# Increment the counter
self._counter[word] +=1
count = self._counter[word]
storm.logInfo("Emitting %s:%s" % (word, count))
# Emit the word and count
storm.emit([word, count])
# Start the bolt when it's invoked
CountBolt().run()
|