diff options
author | Cody Hiar <cody@hiar.ca> | 2021-10-25 14:34:37 -0600 |
---|---|---|
committer | Cody Hiar <cody@hiar.ca> | 2021-10-25 14:34:37 -0600 |
commit | e3d2ffca585660f0c088ab8323bfe78a86ba75e3 (patch) | |
tree | 1f542c4d1c032cdc85149ccfd28cafc402ddb3d0 | |
parent | 08cf73a70e3ce2dbc85e8b389f15090c148ec003 (diff) |
Save working version of deploying jars
-rw-r--r-- | Makefile | 3 | ||||
-rw-r--r-- | docker-compose.yml | 45 | ||||
-rw-r--r-- | docker/Dockerfile | 2 | ||||
-rw-r--r-- | docker/storm.yaml | 9 | ||||
-rw-r--r-- | wordcount/.gitignore | 20 | ||||
-rw-r--r-- | wordcount/CONTRIBUTING.md | 11 | ||||
-rw-r--r-- | wordcount/LICENSE | 21 | ||||
-rw-r--r-- | wordcount/README.md | 88 | ||||
-rw-r--r-- | wordcount/config.json | 20 | ||||
-rw-r--r-- | wordcount/fabfile.py | 11 | ||||
-rw-r--r-- | wordcount/multilang/resources/countbolt.py | 26 | ||||
-rw-r--r-- | wordcount/multilang/resources/sentencespout.py | 28 | ||||
-rw-r--r-- | wordcount/multilang/resources/splitbolt.py | 21 | ||||
-rw-r--r-- | wordcount/pom.xml | 128 | ||||
-rw-r--r-- | wordcount/project.clj | 13 | ||||
-rw-r--r-- | wordcount/resources/log4j2.xml | 18 | ||||
-rw-r--r-- | wordcount/resources/topology.yaml | 73 | ||||
-rw-r--r-- | wordcount/src/bolts/__init__.py | 0 | ||||
-rw-r--r-- | wordcount/src/bolts/wordcount.py | 26 | ||||
-rw-r--r-- | wordcount/src/spouts/__init__.py | 0 | ||||
-rw-r--r-- | wordcount/src/spouts/words.py | 14 | ||||
-rw-r--r-- | wordcount/topologies/wordcount.py | 13 | ||||
-rw-r--r-- | wordcount/virtualenvs/wordcount.txt | 1 |
23 files changed, 486 insertions, 105 deletions
@@ -15,5 +15,8 @@ down: ## Stop the container enter: ## Enter the running container docker-compose -p docker_storm exec backend /bin/bash +enter_nimbus: ## Enter the running nimbus container + docker-compose -p docker_storm exec nimbus /bin/bash + clean: down ## Remove stoped containers docker-compose -p docker_storm rm -f diff --git a/docker-compose.yml b/docker-compose.yml index d22f668..ef279fa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,14 +1,53 @@ version: '3' services: + zookeeper: + image: zookeeper + container_name: zookeeper + restart: always + + nimbus: + image: storm + container_name: nimbus + command: storm nimbus + depends_on: + - zookeeper + links: + - zookeeper + restart: always + ports: + - 6627:6627 + volumes: + - .:/usr/src/app + + supervisor: + image: storm + container_name: supervisor + command: storm supervisor + depends_on: + - nimbus + - zookeeper + links: + - nimbus + - zookeeper + restart: always + + ui: + image: storm + container_name: storm_ui + command: storm ui + ports: + - 8080:8080 + depends_on: + - nimbus + links: + - nimbus + backend: build: context: ./ dockerfile: ./docker/Dockerfile - network_mode: "bridge" container_name: docker_storm image: thornycrackers/docker_storm - ports: - - "8000" volumes: - .:/usr/src/app command: /bin/bash diff --git a/docker/Dockerfile b/docker/Dockerfile index ac833c3..4465fd1 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -23,5 +23,7 @@ ENV LEIN_ROOT=/usr/src/app ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 RUN ln -s /usr/lib/jvm/java-11-openjdk-amd64/bin/storm /bin/storm +COPY ./docker/storm.yaml /root/.storm/storm.yaml + # The code is stored here WORKDIR /usr/src/app diff --git a/docker/storm.yaml b/docker/storm.yaml new file mode 100644 index 0000000..283481f --- /dev/null +++ b/docker/storm.yaml @@ -0,0 +1,9 @@ +storm.zookeeper.servers: + - "localhost" +storm.local.dir: "/tmp/storm" +nimbus.host: "localhost" +supervisor.slots.ports: + - 6700 + - 6701 + - 6702 + - 6703 diff --git a/wordcount/.gitignore b/wordcount/.gitignore index 37b28cc..959264d 100644 --- a/wordcount/.gitignore +++ b/wordcount/.gitignore @@ -1,4 +1,16 @@ -.DS_Store -_build -_resources -logs +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +.idea/ +*.log +*.log.lck +.classpath +.settings/ +*.iml +.project
\ No newline at end of file diff --git a/wordcount/CONTRIBUTING.md b/wordcount/CONTRIBUTING.md new file mode 100644 index 0000000..efcad49 --- /dev/null +++ b/wordcount/CONTRIBUTING.md @@ -0,0 +1,11 @@ +# Contributing to Azure samples
+
+Thank you for your interest in contributing to Azure samples!
+
+## Ways to contribute
+
+You can contribute to [Azure samples](https://azure.microsoft.com/documentation/samples/) in a few different ways:
+
+- Submit feedback on [this sample page](https://azure.microsoft.com/documentation/samples/hdinsight-python-storm-wordcount/) whether it was helpful or not.
+- Submit issues through [issue tracker](https://github.com/Azure-Samples/hdinsight-python-storm-wordcount/issues) on GitHub. We are actively monitoring the issues and improving our samples.
+- If you wish to make code changes to samples, or contribute something new, please follow the [GitHub Forks / Pull requests model](https://help.github.com/articles/fork-a-repo/): Fork the sample repo, make the change and propose it back by submitting a pull request.
\ No newline at end of file diff --git a/wordcount/LICENSE b/wordcount/LICENSE new file mode 100644 index 0000000..d8d98a8 --- /dev/null +++ b/wordcount/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2015 Microsoft Corporation + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.
\ No newline at end of file diff --git a/wordcount/README.md b/wordcount/README.md new file mode 100644 index 0000000..3cd7765 --- /dev/null +++ b/wordcount/README.md @@ -0,0 +1,88 @@ +---
+services: hdinsight
+platforms: java,python
+author: blackmist
+---
+# hdinsight-python-storm-wordcount
+
+How to use Python components in an Apache Storm topology on HDInsight
+
+This topology uses the Flux framework to define a Storm topology using YAML. The components (spout and bolts) that process the data are written in Python.
+
+This example has been tested with HDInsight 3.6 (Storm 1.1.0).
+
+## Prerequisites
+
+* Python 2.7 or higher
+
+* Java JDK 1.8 or higher
+
+* Maven
+
+* (Optional) A local Storm development environment. This is only needed if you want to run the topology locally. For more information, see [Setting up a development environment](http://storm.apache.org/releases/1.0.1/Setting-up-development-environment.html).
+
+## How it works
+
+* `/resources/topology.yaml` - defines what components are in the topology and how data flows between them.
+
+* `/multilang/resources` - contains the Python components.
+
+* `/pom.xml` - dependencies and how to build the project.
+
+## Build the project
+
+From the root of the project, use the following command:
+
+```bash
+mvn clean compile package
+```
+
+This command creates a `target/WordCount-1.0-SNAPSHOT.jar` file.
+
+## Run the topology locally
+
+To run the topology locally, use the following command:
+
+```bash
+storm jar WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux -l -R /topology.yaml
+```
+
+Once the topology starts, it emits information to the local console similar to the following text:
+
+```
+24302 [Thread-25-sentence-spout-executor[4 4]] INFO o.a.s.s.ShellSpout - ShellLog pid:2436, name:sentence-spout Emiting the cow jumped over the moon
+24302 [Thread-30] INFO o.a.s.t.ShellBolt - ShellLog pid:2438, name:splitter-bolt Emitting the
+24302 [Thread-28] INFO o.a.s.t.ShellBolt - ShellLog pid:2437, name:counter-bolt Emitting years:160
+24302 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=the, count=599}
+24303 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=seven, count=302}
+24303 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=dwarfs, count=143}
+24303 [Thread-25-sentence-spout-executor[4 4]] INFO o.a.s.s.ShellSpout - ShellLog pid:2436, name:sentence-spout Emiting the cow jumped over the moon
+24303 [Thread-30] INFO o.a.s.t.ShellBolt - ShellLog pid:2438, name:splitter-bolt Emitting cow
+^C24303 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=four, count=160}
+```
+
+Use Ctrl+c to stop the topology.
+
+## Run the topology on HDInsight
+
+1. Use the following command to copy the `WordCount-1.0-SNAPSHOT.jar` file to your Storm on HDInsight cluster:
+
+ ```bash
+ scp target\WordCount-1.0-SNAPSHOT.jar sshuser@mycluster-ssh.azurehdinsight.net
+ ```
+
+ Replace `sshuser` with the SSH user for your cluster. Replace `mycluster` with the cluster name.
+
+2. Once the file has been uploaded, connect to the cluster using SSH and use the following command to start the topology on the cluster:
+
+ ```bash
+ storm jar WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux -r -R /topology.yaml
+ ```
+
+3. You can use the Storm UI to view the topology on the cluster. The Storm UI is located at https://mycluster.azurehdinsight.net/stormui. Replace `mycluster` with your cluster name.
+
+Once started, a Storm topology runs until stopped (killed.) To stop the topology, use either the `storm kill TOPOLOGYNAME` command from the command-line (SSH session to a Linux cluster,) or by using the Storm UI, select the topology, and then select the __Kill__ button.
+
+## Project code of conduct
+
+This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
diff --git a/wordcount/config.json b/wordcount/config.json deleted file mode 100644 index 778cb51..0000000 --- a/wordcount/config.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "serializer": "json", - "topology_specs": "topologies/", - "virtualenv_specs": "virtualenvs/", - "envs": { - "prod": { - "user": "", - "ssh_password": "", - "nimbus": "", - "workers": [], - "log": { - "path": "", - "max_bytes": 1000000, - "backup_count": 10, - "level": "info" - }, - "virtualenv_root": "" - } - } -} diff --git a/wordcount/fabfile.py b/wordcount/fabfile.py deleted file mode 100644 index 8c89cbc..0000000 --- a/wordcount/fabfile.py +++ /dev/null @@ -1,11 +0,0 @@ -def pre_submit(topology_name, env_name, env_config, options): - """Override this function to perform custom actions prior to topology - submission. No SSH tunnels will be active when this function is called.""" - pass - - -def post_submit(topo_name, env_name, env_config, options): - """Override this function to perform custom actions after topology - submission. Note that the SSH tunnel to Nimbus will still be active - when this function is called.""" - pass diff --git a/wordcount/multilang/resources/countbolt.py b/wordcount/multilang/resources/countbolt.py new file mode 100644 index 0000000..4aebc19 --- /dev/null +++ b/wordcount/multilang/resources/countbolt.py @@ -0,0 +1,26 @@ +import storm +# Counter is a nice way to count things, +# but it is a Python 2.7 thing +from collections import Counter + +class CountBolt(storm.BasicBolt): + # Initialize this instance + def initialize(self, conf, context): + self._conf = conf + self._context = context + # Create a new counter for this instance + self._counter = Counter() + storm.logInfo("Counter bolt instance starting...") + + def process(self, tup): + # Get the word from the inbound tuple + word = tup.values[0] + # Increment the counter + self._counter[word] +=1 + count = self._counter[word] + storm.logInfo("Emitting %s:%s" % (word, count)) + # Emit the word and count + storm.emit([word, count]) + +# Start the bolt when it's invoked +CountBolt().run() diff --git a/wordcount/multilang/resources/sentencespout.py b/wordcount/multilang/resources/sentencespout.py new file mode 100644 index 0000000..a75f37d --- /dev/null +++ b/wordcount/multilang/resources/sentencespout.py @@ -0,0 +1,28 @@ +import storm +import random +# Define some sentences +SENTENCES = """ +the cow jumped over the moon +an apple a day keeps the doctor away +four score and seven years ago +snow white and the seven dwarfs +i am at two with nature +""".strip().split('\n') + +class SentenceSpout(storm.Spout): + # Not much to do here for such a basic spout + def initialize(self, conf, context): + self._conf = conf + self._context = context + + storm.logInfo("Spout instance starting...") + + # Process the next tuple + def nextTuple(self): + # Emit a random sentence + sentence = random.choice(SENTENCES) + storm.logInfo("Emiting %s" % sentence) + storm.emit([sentence]) + +# Start the spout when it's invoked +SentenceSpout().run() diff --git a/wordcount/multilang/resources/splitbolt.py b/wordcount/multilang/resources/splitbolt.py new file mode 100644 index 0000000..b46a901 --- /dev/null +++ b/wordcount/multilang/resources/splitbolt.py @@ -0,0 +1,21 @@ +import storm + +class SplitBolt(storm.BasicBolt): + # There's nothing to initialize here, + # since this is just a split and emit + # Initialize this instance + def initialize(self, conf, context): + self._conf = conf + self._context = context + storm.logInfo("Split bolt instance starting...") + + def process(self, tup): + # Split the inbound sentence at spaces + words = tup.values[0].split() + # Loop over words and emit + for word in words: + storm.logInfo("Emitting %s" % word) + storm.emit([word]) + +# Start the bolt when it's invoked +SplitBolt().run() diff --git a/wordcount/pom.xml b/wordcount/pom.xml new file mode 100644 index 0000000..941b7a9 --- /dev/null +++ b/wordcount/pom.xml @@ -0,0 +1,128 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>com.microsoft.example</groupId> + <artifactId>WordCount</artifactId> + <packaging>jar</packaging> + <version>1.0-SNAPSHOT</version> + <name>WordCount</name> + <url>http://maven.apache.org</url> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <!-- + Storm 1.0.1 is for HDInsight 3.6. + To find the version information for earlier HDInsight cluster + versions, see https://azure.microsoft.com/en-us/documentation/articles/hdinsight-component-versioning/ + --> + <storm.version>2.2.0</storm.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${storm.version}</version> + <!-- keep storm out of the jar-with-dependencies --> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>flux-core</artifactId> + <version>${storm.version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>flux-wrappers</artifactId> + <version>${storm.version}</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.3</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <!-- build an uber jar --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.3</version> + <configuration> + <transformers> + <!-- Keep us from getting a can't overwrite file error --> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" /> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + <!-- We're using Flux, so refer to it as main --> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.storm.flux.Flux</mainClass> + </transformer> + </transformers> + <!-- Keep us from getting a bad signature error --> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + </plugin> + <!-- This allows us to use `mvn exec:java` to run the topology in development --> + <!-- https://mvnrepository.com/artifact/org.codehaus.mojo/exec-maven-plugin --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.5.0</version> + <executions> + <execution> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>java</executable> + <includeProjectDependencies>true</includeProjectDependencies> + <includePluginDependencies>false</includePluginDependencies> + <classpathScope>compile</classpathScope> + <!-- Set mainClass as Flux, since we are using that --> + <mainClass>org.apache.storm.flux.Flux</mainClass> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>${basedir}/resources</directory> + <filtering>false</filtering> + <includes> + <!-- + Include configuration for Log4j + to reduce the clutter in log output + --> + <include>log4j2.xml</include> + <!-- topology definition --> + <include>topology.yaml</include> + </includes> + </resource> + <!-- include the Python components --> + <resource> + <directory>${basedir}/multilang</directory> + <filtering>false</filtering> + </resource> + </resources> + </build> +</project> diff --git a/wordcount/project.clj b/wordcount/project.clj deleted file mode 100644 index 4b08880..0000000 --- a/wordcount/project.clj +++ /dev/null @@ -1,13 +0,0 @@ -(defproject wordcount "0.0.1-SNAPSHOT" - :resource-paths ["_resources"] - :target-path "_build" - :min-lein-version "2.0.0" - :jvm-opts ["-client"] - :dependencies [[org.apache.storm/storm-core "2.3.0"] - [org.apache.storm/flux-core "2.3.0"]] - :jar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"] - :uberjar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"] - ) -(require 'cemerick.pomegranate.aether) -(cemerick.pomegranate.aether/register-wagon-factory! - "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) diff --git a/wordcount/resources/log4j2.xml b/wordcount/resources/log4j2.xml new file mode 100644 index 0000000..d2b2981 --- /dev/null +++ b/wordcount/resources/log4j2.xml @@ -0,0 +1,18 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- See http://logging.apache.org/log4j/2.x/manual/configuration.html + for more information on Log4j configuration. --> +<Configuration> + <Appenders> + <Console name="STDOUT" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/> + </Console> + </Appenders> + <Loggers> + <Logger name="com.microsoft.example" level="trace" additivity="false"> + <AppenderRef ref="STDOUT"/> + </Logger> + <Root level="error"> + <Appender-Ref ref="STDOUT"/> + </Root> + </Loggers> +</Configuration>
\ No newline at end of file diff --git a/wordcount/resources/topology.yaml b/wordcount/resources/topology.yaml new file mode 100644 index 0000000..725f3be --- /dev/null +++ b/wordcount/resources/topology.yaml @@ -0,0 +1,73 @@ +# topology definition + +# name to be used when submitting. This is what shows up... +# in the Storm UI/storm command-line tool as the topology name +# when submitted to Storm +name: "wordcount" + +# Topology configuration +config: + # Hint for the number of workers to create + topology.workers: 1 + +# Spout definitions +spouts: + - id: "sentence-spout" + className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout" + constructorArgs: + # Command line + - ["python", "sentencespout.py"] + # Output field(s) + - ["sentence"] + # parallelism hint + parallelism: 1 + +# Bolt definitions +bolts: + - id: "splitter-bolt" + className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" + constructorArgs: + # Command line + - ["python", "splitbolt.py"] + # Output field(s) + - ["word"] + parallelism: 1 + + - id: "counter-bolt" + className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" + constructorArgs: + # Command line + - ["python", "countbolt.py"] + # Output field(s) + - ["word","count"] + parallelism: 1 + + # Logging + - id: "log" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + +# Stream definitions +streams: + - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.) + # The stream emitter + from: "sentence-spout" + # The stream consumer + to: "splitter-bolt" + # Grouping type + grouping: + type: SHUFFLE + + - name: "Splitter -> Counter" + from: "splitter-bolt" + to: "counter-bolt" + grouping: + type: FIELDS + # field(s) to group on + args: ["word"] + + - name: "Counter -> Log" + from: "counter-bolt" + to: "log" + grouping: + type: SHUFFLE
\ No newline at end of file diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/wordcount/src/bolts/__init__.py +++ /dev/null diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py deleted file mode 100644 index 85fbfb7..0000000 --- a/wordcount/src/bolts/wordcount.py +++ /dev/null @@ -1,26 +0,0 @@ -import os -from collections import Counter - -from streamparse import Bolt - - -class WordCountBolt(Bolt): - outputs = ["word", "count"] - - def initialize(self, conf, ctx): - self.counter = Counter() - self.pid = os.getpid() - self.total = 0 - - def _increment(self, word, inc_by): - self.counter[word] += inc_by - self.total += inc_by - - def process(self, tup): - word = tup.values[0] - self._increment(word, 10 if word == "dog" else 1) - if self.total % 1000 == 0: - self.logger.info( - f"counted [{self.total:,}] words [pid={self.pid}]" - ) - self.emit([word, self.counter[word]]) diff --git a/wordcount/src/spouts/__init__.py b/wordcount/src/spouts/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/wordcount/src/spouts/__init__.py +++ /dev/null diff --git a/wordcount/src/spouts/words.py b/wordcount/src/spouts/words.py deleted file mode 100644 index 6ba88c1..0000000 --- a/wordcount/src/spouts/words.py +++ /dev/null @@ -1,14 +0,0 @@ -from itertools import cycle - -from streamparse import Spout - - -class WordSpout(Spout): - outputs = ["word"] - - def initialize(self, stormconf, context): - self.words = cycle(["dog", "cat", "zebra", "elephant"]) - - def next_tuple(self): - word = next(self.words) - self.emit([word]) diff --git a/wordcount/topologies/wordcount.py b/wordcount/topologies/wordcount.py deleted file mode 100644 index d66e1b3..0000000 --- a/wordcount/topologies/wordcount.py +++ /dev/null @@ -1,13 +0,0 @@ -""" -Word count topology -""" - -from streamparse import Grouping, Topology - -from bolts.wordcount import WordCountBolt -from spouts.words import WordSpout - - -class WordCount(Topology): - word_spout = WordSpout.spec() - count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, par=2) diff --git a/wordcount/virtualenvs/wordcount.txt b/wordcount/virtualenvs/wordcount.txt deleted file mode 100644 index 9229811..0000000 --- a/wordcount/virtualenvs/wordcount.txt +++ /dev/null @@ -1 +0,0 @@ -streamparse # always required for streamparse projects |