aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCody Hiar <cody@hiar.ca>2021-10-25 14:34:37 -0600
committerCody Hiar <cody@hiar.ca>2021-10-25 14:34:37 -0600
commite3d2ffca585660f0c088ab8323bfe78a86ba75e3 (patch)
tree1f542c4d1c032cdc85149ccfd28cafc402ddb3d0
parent08cf73a70e3ce2dbc85e8b389f15090c148ec003 (diff)
Save working version of deploying jars
-rw-r--r--Makefile3
-rw-r--r--docker-compose.yml45
-rw-r--r--docker/Dockerfile2
-rw-r--r--docker/storm.yaml9
-rw-r--r--wordcount/.gitignore20
-rw-r--r--wordcount/CONTRIBUTING.md11
-rw-r--r--wordcount/LICENSE21
-rw-r--r--wordcount/README.md88
-rw-r--r--wordcount/config.json20
-rw-r--r--wordcount/fabfile.py11
-rw-r--r--wordcount/multilang/resources/countbolt.py26
-rw-r--r--wordcount/multilang/resources/sentencespout.py28
-rw-r--r--wordcount/multilang/resources/splitbolt.py21
-rw-r--r--wordcount/pom.xml128
-rw-r--r--wordcount/project.clj13
-rw-r--r--wordcount/resources/log4j2.xml18
-rw-r--r--wordcount/resources/topology.yaml73
-rw-r--r--wordcount/src/bolts/__init__.py0
-rw-r--r--wordcount/src/bolts/wordcount.py26
-rw-r--r--wordcount/src/spouts/__init__.py0
-rw-r--r--wordcount/src/spouts/words.py14
-rw-r--r--wordcount/topologies/wordcount.py13
-rw-r--r--wordcount/virtualenvs/wordcount.txt1
23 files changed, 486 insertions, 105 deletions
diff --git a/Makefile b/Makefile
index ea21190..09dff25 100644
--- a/Makefile
+++ b/Makefile
@@ -15,5 +15,8 @@ down: ## Stop the container
enter: ## Enter the running container
docker-compose -p docker_storm exec backend /bin/bash
+enter_nimbus: ## Enter the running nimbus container
+ docker-compose -p docker_storm exec nimbus /bin/bash
+
clean: down ## Remove stoped containers
docker-compose -p docker_storm rm -f
diff --git a/docker-compose.yml b/docker-compose.yml
index d22f668..ef279fa 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,14 +1,53 @@
version: '3'
services:
+ zookeeper:
+ image: zookeeper
+ container_name: zookeeper
+ restart: always
+
+ nimbus:
+ image: storm
+ container_name: nimbus
+ command: storm nimbus
+ depends_on:
+ - zookeeper
+ links:
+ - zookeeper
+ restart: always
+ ports:
+ - 6627:6627
+ volumes:
+ - .:/usr/src/app
+
+ supervisor:
+ image: storm
+ container_name: supervisor
+ command: storm supervisor
+ depends_on:
+ - nimbus
+ - zookeeper
+ links:
+ - nimbus
+ - zookeeper
+ restart: always
+
+ ui:
+ image: storm
+ container_name: storm_ui
+ command: storm ui
+ ports:
+ - 8080:8080
+ depends_on:
+ - nimbus
+ links:
+ - nimbus
+
backend:
build:
context: ./
dockerfile: ./docker/Dockerfile
- network_mode: "bridge"
container_name: docker_storm
image: thornycrackers/docker_storm
- ports:
- - "8000"
volumes:
- .:/usr/src/app
command: /bin/bash
diff --git a/docker/Dockerfile b/docker/Dockerfile
index ac833c3..4465fd1 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -23,5 +23,7 @@ ENV LEIN_ROOT=/usr/src/app
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
RUN ln -s /usr/lib/jvm/java-11-openjdk-amd64/bin/storm /bin/storm
+COPY ./docker/storm.yaml /root/.storm/storm.yaml
+
# The code is stored here
WORKDIR /usr/src/app
diff --git a/docker/storm.yaml b/docker/storm.yaml
new file mode 100644
index 0000000..283481f
--- /dev/null
+++ b/docker/storm.yaml
@@ -0,0 +1,9 @@
+storm.zookeeper.servers:
+ - "localhost"
+storm.local.dir: "/tmp/storm"
+nimbus.host: "localhost"
+supervisor.slots.ports:
+ - 6700
+ - 6701
+ - 6702
+ - 6703
diff --git a/wordcount/.gitignore b/wordcount/.gitignore
index 37b28cc..959264d 100644
--- a/wordcount/.gitignore
+++ b/wordcount/.gitignore
@@ -1,4 +1,16 @@
-.DS_Store
-_build
-_resources
-logs
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+.idea/
+*.log
+*.log.lck
+.classpath
+.settings/
+*.iml
+.project \ No newline at end of file
diff --git a/wordcount/CONTRIBUTING.md b/wordcount/CONTRIBUTING.md
new file mode 100644
index 0000000..efcad49
--- /dev/null
+++ b/wordcount/CONTRIBUTING.md
@@ -0,0 +1,11 @@
+# Contributing to Azure samples
+
+Thank you for your interest in contributing to Azure samples!
+
+## Ways to contribute
+
+You can contribute to [Azure samples](https://azure.microsoft.com/documentation/samples/) in a few different ways:
+
+- Submit feedback on [this sample page](https://azure.microsoft.com/documentation/samples/hdinsight-python-storm-wordcount/) whether it was helpful or not.
+- Submit issues through [issue tracker](https://github.com/Azure-Samples/hdinsight-python-storm-wordcount/issues) on GitHub. We are actively monitoring the issues and improving our samples.
+- If you wish to make code changes to samples, or contribute something new, please follow the [GitHub Forks / Pull requests model](https://help.github.com/articles/fork-a-repo/): Fork the sample repo, make the change and propose it back by submitting a pull request. \ No newline at end of file
diff --git a/wordcount/LICENSE b/wordcount/LICENSE
new file mode 100644
index 0000000..d8d98a8
--- /dev/null
+++ b/wordcount/LICENSE
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2015 Microsoft Corporation
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE. \ No newline at end of file
diff --git a/wordcount/README.md b/wordcount/README.md
new file mode 100644
index 0000000..3cd7765
--- /dev/null
+++ b/wordcount/README.md
@@ -0,0 +1,88 @@
+---
+services: hdinsight
+platforms: java,python
+author: blackmist
+---
+# hdinsight-python-storm-wordcount
+
+How to use Python components in an Apache Storm topology on HDInsight
+
+This topology uses the Flux framework to define a Storm topology using YAML. The components (spout and bolts) that process the data are written in Python.
+
+This example has been tested with HDInsight 3.6 (Storm 1.1.0).
+
+## Prerequisites
+
+* Python 2.7 or higher
+
+* Java JDK 1.8 or higher
+
+* Maven
+
+* (Optional) A local Storm development environment. This is only needed if you want to run the topology locally. For more information, see [Setting up a development environment](http://storm.apache.org/releases/1.0.1/Setting-up-development-environment.html).
+
+## How it works
+
+* `/resources/topology.yaml` - defines what components are in the topology and how data flows between them.
+
+* `/multilang/resources` - contains the Python components.
+
+* `/pom.xml` - dependencies and how to build the project.
+
+## Build the project
+
+From the root of the project, use the following command:
+
+```bash
+mvn clean compile package
+```
+
+This command creates a `target/WordCount-1.0-SNAPSHOT.jar` file.
+
+## Run the topology locally
+
+To run the topology locally, use the following command:
+
+```bash
+storm jar WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux -l -R /topology.yaml
+```
+
+Once the topology starts, it emits information to the local console similar to the following text:
+
+```
+24302 [Thread-25-sentence-spout-executor[4 4]] INFO o.a.s.s.ShellSpout - ShellLog pid:2436, name:sentence-spout Emiting the cow jumped over the moon
+24302 [Thread-30] INFO o.a.s.t.ShellBolt - ShellLog pid:2438, name:splitter-bolt Emitting the
+24302 [Thread-28] INFO o.a.s.t.ShellBolt - ShellLog pid:2437, name:counter-bolt Emitting years:160
+24302 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=the, count=599}
+24303 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=seven, count=302}
+24303 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=dwarfs, count=143}
+24303 [Thread-25-sentence-spout-executor[4 4]] INFO o.a.s.s.ShellSpout - ShellLog pid:2436, name:sentence-spout Emiting the cow jumped over the moon
+24303 [Thread-30] INFO o.a.s.t.ShellBolt - ShellLog pid:2438, name:splitter-bolt Emitting cow
+^C24303 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=four, count=160}
+```
+
+Use Ctrl+c to stop the topology.
+
+## Run the topology on HDInsight
+
+1. Use the following command to copy the `WordCount-1.0-SNAPSHOT.jar` file to your Storm on HDInsight cluster:
+
+ ```bash
+ scp target\WordCount-1.0-SNAPSHOT.jar sshuser@mycluster-ssh.azurehdinsight.net
+ ```
+
+ Replace `sshuser` with the SSH user for your cluster. Replace `mycluster` with the cluster name.
+
+2. Once the file has been uploaded, connect to the cluster using SSH and use the following command to start the topology on the cluster:
+
+ ```bash
+ storm jar WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux -r -R /topology.yaml
+ ```
+
+3. You can use the Storm UI to view the topology on the cluster. The Storm UI is located at https://mycluster.azurehdinsight.net/stormui. Replace `mycluster` with your cluster name.
+
+Once started, a Storm topology runs until stopped (killed.) To stop the topology, use either the `storm kill TOPOLOGYNAME` command from the command-line (SSH session to a Linux cluster,) or by using the Storm UI, select the topology, and then select the __Kill__ button.
+
+## Project code of conduct
+
+This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
diff --git a/wordcount/config.json b/wordcount/config.json
deleted file mode 100644
index 778cb51..0000000
--- a/wordcount/config.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "serializer": "json",
- "topology_specs": "topologies/",
- "virtualenv_specs": "virtualenvs/",
- "envs": {
- "prod": {
- "user": "",
- "ssh_password": "",
- "nimbus": "",
- "workers": [],
- "log": {
- "path": "",
- "max_bytes": 1000000,
- "backup_count": 10,
- "level": "info"
- },
- "virtualenv_root": ""
- }
- }
-}
diff --git a/wordcount/fabfile.py b/wordcount/fabfile.py
deleted file mode 100644
index 8c89cbc..0000000
--- a/wordcount/fabfile.py
+++ /dev/null
@@ -1,11 +0,0 @@
-def pre_submit(topology_name, env_name, env_config, options):
- """Override this function to perform custom actions prior to topology
- submission. No SSH tunnels will be active when this function is called."""
- pass
-
-
-def post_submit(topo_name, env_name, env_config, options):
- """Override this function to perform custom actions after topology
- submission. Note that the SSH tunnel to Nimbus will still be active
- when this function is called."""
- pass
diff --git a/wordcount/multilang/resources/countbolt.py b/wordcount/multilang/resources/countbolt.py
new file mode 100644
index 0000000..4aebc19
--- /dev/null
+++ b/wordcount/multilang/resources/countbolt.py
@@ -0,0 +1,26 @@
+import storm
+# Counter is a nice way to count things,
+# but it is a Python 2.7 thing
+from collections import Counter
+
+class CountBolt(storm.BasicBolt):
+ # Initialize this instance
+ def initialize(self, conf, context):
+ self._conf = conf
+ self._context = context
+ # Create a new counter for this instance
+ self._counter = Counter()
+ storm.logInfo("Counter bolt instance starting...")
+
+ def process(self, tup):
+ # Get the word from the inbound tuple
+ word = tup.values[0]
+ # Increment the counter
+ self._counter[word] +=1
+ count = self._counter[word]
+ storm.logInfo("Emitting %s:%s" % (word, count))
+ # Emit the word and count
+ storm.emit([word, count])
+
+# Start the bolt when it's invoked
+CountBolt().run()
diff --git a/wordcount/multilang/resources/sentencespout.py b/wordcount/multilang/resources/sentencespout.py
new file mode 100644
index 0000000..a75f37d
--- /dev/null
+++ b/wordcount/multilang/resources/sentencespout.py
@@ -0,0 +1,28 @@
+import storm
+import random
+# Define some sentences
+SENTENCES = """
+the cow jumped over the moon
+an apple a day keeps the doctor away
+four score and seven years ago
+snow white and the seven dwarfs
+i am at two with nature
+""".strip().split('\n')
+
+class SentenceSpout(storm.Spout):
+ # Not much to do here for such a basic spout
+ def initialize(self, conf, context):
+ self._conf = conf
+ self._context = context
+
+ storm.logInfo("Spout instance starting...")
+
+ # Process the next tuple
+ def nextTuple(self):
+ # Emit a random sentence
+ sentence = random.choice(SENTENCES)
+ storm.logInfo("Emiting %s" % sentence)
+ storm.emit([sentence])
+
+# Start the spout when it's invoked
+SentenceSpout().run()
diff --git a/wordcount/multilang/resources/splitbolt.py b/wordcount/multilang/resources/splitbolt.py
new file mode 100644
index 0000000..b46a901
--- /dev/null
+++ b/wordcount/multilang/resources/splitbolt.py
@@ -0,0 +1,21 @@
+import storm
+
+class SplitBolt(storm.BasicBolt):
+ # There's nothing to initialize here,
+ # since this is just a split and emit
+ # Initialize this instance
+ def initialize(self, conf, context):
+ self._conf = conf
+ self._context = context
+ storm.logInfo("Split bolt instance starting...")
+
+ def process(self, tup):
+ # Split the inbound sentence at spaces
+ words = tup.values[0].split()
+ # Loop over words and emit
+ for word in words:
+ storm.logInfo("Emitting %s" % word)
+ storm.emit([word])
+
+# Start the bolt when it's invoked
+SplitBolt().run()
diff --git a/wordcount/pom.xml b/wordcount/pom.xml
new file mode 100644
index 0000000..941b7a9
--- /dev/null
+++ b/wordcount/pom.xml
@@ -0,0 +1,128 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.microsoft.example</groupId>
+ <artifactId>WordCount</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-SNAPSHOT</version>
+ <name>WordCount</name>
+ <url>http://maven.apache.org</url>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <!--
+ Storm 1.0.1 is for HDInsight 3.6.
+ To find the version information for earlier HDInsight cluster
+ versions, see https://azure.microsoft.com/en-us/documentation/articles/hdinsight-component-versioning/
+ -->
+ <storm.version>2.2.0</storm.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ <!-- keep storm out of the jar-with-dependencies -->
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-wrappers</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <!-- build an uber jar -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <configuration>
+ <transformers>
+ <!-- Keep us from getting a can't overwrite file error -->
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <!-- We're using Flux, so refer to it as main -->
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.storm.flux.Flux</mainClass>
+ </transformer>
+ </transformers>
+ <!-- Keep us from getting a bad signature error -->
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- This allows us to use `mvn exec:java` to run the topology in development -->
+ <!-- https://mvnrepository.com/artifact/org.codehaus.mojo/exec-maven-plugin -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>java</executable>
+ <includeProjectDependencies>true</includeProjectDependencies>
+ <includePluginDependencies>false</includePluginDependencies>
+ <classpathScope>compile</classpathScope>
+ <!-- Set mainClass as Flux, since we are using that -->
+ <mainClass>org.apache.storm.flux.Flux</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>${basedir}/resources</directory>
+ <filtering>false</filtering>
+ <includes>
+ <!--
+ Include configuration for Log4j
+ to reduce the clutter in log output
+ -->
+ <include>log4j2.xml</include>
+ <!-- topology definition -->
+ <include>topology.yaml</include>
+ </includes>
+ </resource>
+ <!-- include the Python components -->
+ <resource>
+ <directory>${basedir}/multilang</directory>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+</project>
diff --git a/wordcount/project.clj b/wordcount/project.clj
deleted file mode 100644
index 4b08880..0000000
--- a/wordcount/project.clj
+++ /dev/null
@@ -1,13 +0,0 @@
-(defproject wordcount "0.0.1-SNAPSHOT"
- :resource-paths ["_resources"]
- :target-path "_build"
- :min-lein-version "2.0.0"
- :jvm-opts ["-client"]
- :dependencies [[org.apache.storm/storm-core "2.3.0"]
- [org.apache.storm/flux-core "2.3.0"]]
- :jar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
- :uberjar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
- )
-(require 'cemerick.pomegranate.aether)
-(cemerick.pomegranate.aether/register-wagon-factory!
- "http" #(org.apache.maven.wagon.providers.http.HttpWagon.))
diff --git a/wordcount/resources/log4j2.xml b/wordcount/resources/log4j2.xml
new file mode 100644
index 0000000..d2b2981
--- /dev/null
+++ b/wordcount/resources/log4j2.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- See http://logging.apache.org/log4j/2.x/manual/configuration.html
+ for more information on Log4j configuration. -->
+<Configuration>
+ <Appenders>
+ <Console name="STDOUT" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="com.microsoft.example" level="trace" additivity="false">
+ <AppenderRef ref="STDOUT"/>
+ </Logger>
+ <Root level="error">
+ <Appender-Ref ref="STDOUT"/>
+ </Root>
+ </Loggers>
+</Configuration> \ No newline at end of file
diff --git a/wordcount/resources/topology.yaml b/wordcount/resources/topology.yaml
new file mode 100644
index 0000000..725f3be
--- /dev/null
+++ b/wordcount/resources/topology.yaml
@@ -0,0 +1,73 @@
+# topology definition
+
+# name to be used when submitting. This is what shows up...
+# in the Storm UI/storm command-line tool as the topology name
+# when submitted to Storm
+name: "wordcount"
+
+# Topology configuration
+config:
+ # Hint for the number of workers to create
+ topology.workers: 1
+
+# Spout definitions
+spouts:
+ - id: "sentence-spout"
+ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
+ constructorArgs:
+ # Command line
+ - ["python", "sentencespout.py"]
+ # Output field(s)
+ - ["sentence"]
+ # parallelism hint
+ parallelism: 1
+
+# Bolt definitions
+bolts:
+ - id: "splitter-bolt"
+ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+ constructorArgs:
+ # Command line
+ - ["python", "splitbolt.py"]
+ # Output field(s)
+ - ["word"]
+ parallelism: 1
+
+ - id: "counter-bolt"
+ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+ constructorArgs:
+ # Command line
+ - ["python", "countbolt.py"]
+ # Output field(s)
+ - ["word","count"]
+ parallelism: 1
+
+ # Logging
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+# Stream definitions
+streams:
+ - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)
+ # The stream emitter
+ from: "sentence-spout"
+ # The stream consumer
+ to: "splitter-bolt"
+ # Grouping type
+ grouping:
+ type: SHUFFLE
+
+ - name: "Splitter -> Counter"
+ from: "splitter-bolt"
+ to: "counter-bolt"
+ grouping:
+ type: FIELDS
+ # field(s) to group on
+ args: ["word"]
+
+ - name: "Counter -> Log"
+ from: "counter-bolt"
+ to: "log"
+ grouping:
+ type: SHUFFLE \ No newline at end of file
diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wordcount/src/bolts/__init__.py
+++ /dev/null
diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py
deleted file mode 100644
index 85fbfb7..0000000
--- a/wordcount/src/bolts/wordcount.py
+++ /dev/null
@@ -1,26 +0,0 @@
-import os
-from collections import Counter
-
-from streamparse import Bolt
-
-
-class WordCountBolt(Bolt):
- outputs = ["word", "count"]
-
- def initialize(self, conf, ctx):
- self.counter = Counter()
- self.pid = os.getpid()
- self.total = 0
-
- def _increment(self, word, inc_by):
- self.counter[word] += inc_by
- self.total += inc_by
-
- def process(self, tup):
- word = tup.values[0]
- self._increment(word, 10 if word == "dog" else 1)
- if self.total % 1000 == 0:
- self.logger.info(
- f"counted [{self.total:,}] words [pid={self.pid}]"
- )
- self.emit([word, self.counter[word]])
diff --git a/wordcount/src/spouts/__init__.py b/wordcount/src/spouts/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wordcount/src/spouts/__init__.py
+++ /dev/null
diff --git a/wordcount/src/spouts/words.py b/wordcount/src/spouts/words.py
deleted file mode 100644
index 6ba88c1..0000000
--- a/wordcount/src/spouts/words.py
+++ /dev/null
@@ -1,14 +0,0 @@
-from itertools import cycle
-
-from streamparse import Spout
-
-
-class WordSpout(Spout):
- outputs = ["word"]
-
- def initialize(self, stormconf, context):
- self.words = cycle(["dog", "cat", "zebra", "elephant"])
-
- def next_tuple(self):
- word = next(self.words)
- self.emit([word])
diff --git a/wordcount/topologies/wordcount.py b/wordcount/topologies/wordcount.py
deleted file mode 100644
index d66e1b3..0000000
--- a/wordcount/topologies/wordcount.py
+++ /dev/null
@@ -1,13 +0,0 @@
-"""
-Word count topology
-"""
-
-from streamparse import Grouping, Topology
-
-from bolts.wordcount import WordCountBolt
-from spouts.words import WordSpout
-
-
-class WordCount(Topology):
- word_spout = WordSpout.spec()
- count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, par=2)
diff --git a/wordcount/virtualenvs/wordcount.txt b/wordcount/virtualenvs/wordcount.txt
deleted file mode 100644
index 9229811..0000000
--- a/wordcount/virtualenvs/wordcount.txt
+++ /dev/null
@@ -1 +0,0 @@
-streamparse # always required for streamparse projects