From e3d2ffca585660f0c088ab8323bfe78a86ba75e3 Mon Sep 17 00:00:00 2001 From: Cody Hiar Date: Mon, 25 Oct 2021 14:34:37 -0600 Subject: Save working version of deploying jars --- Makefile | 3 + docker-compose.yml | 45 ++++++++- docker/Dockerfile | 2 + docker/storm.yaml | 9 ++ wordcount/.gitignore | 20 +++- wordcount/CONTRIBUTING.md | 11 +++ wordcount/LICENSE | 21 ++++ wordcount/README.md | 88 +++++++++++++++++ wordcount/config.json | 20 ---- wordcount/fabfile.py | 11 --- wordcount/multilang/resources/countbolt.py | 26 +++++ wordcount/multilang/resources/sentencespout.py | 28 ++++++ wordcount/multilang/resources/splitbolt.py | 21 ++++ wordcount/pom.xml | 128 +++++++++++++++++++++++++ wordcount/project.clj | 13 --- wordcount/resources/log4j2.xml | 18 ++++ wordcount/resources/topology.yaml | 73 ++++++++++++++ wordcount/src/bolts/__init__.py | 0 wordcount/src/bolts/wordcount.py | 26 ----- wordcount/src/spouts/__init__.py | 0 wordcount/src/spouts/words.py | 14 --- wordcount/topologies/wordcount.py | 13 --- wordcount/virtualenvs/wordcount.txt | 1 - 23 files changed, 486 insertions(+), 105 deletions(-) create mode 100644 docker/storm.yaml create mode 100644 wordcount/CONTRIBUTING.md create mode 100644 wordcount/LICENSE create mode 100644 wordcount/README.md delete mode 100644 wordcount/config.json delete mode 100644 wordcount/fabfile.py create mode 100644 wordcount/multilang/resources/countbolt.py create mode 100644 wordcount/multilang/resources/sentencespout.py create mode 100644 wordcount/multilang/resources/splitbolt.py create mode 100644 wordcount/pom.xml delete mode 100644 wordcount/project.clj create mode 100644 wordcount/resources/log4j2.xml create mode 100644 wordcount/resources/topology.yaml delete mode 100644 wordcount/src/bolts/__init__.py delete mode 100644 wordcount/src/bolts/wordcount.py delete mode 100644 wordcount/src/spouts/__init__.py delete mode 100644 wordcount/src/spouts/words.py delete mode 100644 wordcount/topologies/wordcount.py delete mode 100644 wordcount/virtualenvs/wordcount.txt diff --git a/Makefile b/Makefile index ea21190..09dff25 100644 --- a/Makefile +++ b/Makefile @@ -15,5 +15,8 @@ down: ## Stop the container enter: ## Enter the running container docker-compose -p docker_storm exec backend /bin/bash +enter_nimbus: ## Enter the running nimbus container + docker-compose -p docker_storm exec nimbus /bin/bash + clean: down ## Remove stoped containers docker-compose -p docker_storm rm -f diff --git a/docker-compose.yml b/docker-compose.yml index d22f668..ef279fa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,14 +1,53 @@ version: '3' services: + zookeeper: + image: zookeeper + container_name: zookeeper + restart: always + + nimbus: + image: storm + container_name: nimbus + command: storm nimbus + depends_on: + - zookeeper + links: + - zookeeper + restart: always + ports: + - 6627:6627 + volumes: + - .:/usr/src/app + + supervisor: + image: storm + container_name: supervisor + command: storm supervisor + depends_on: + - nimbus + - zookeeper + links: + - nimbus + - zookeeper + restart: always + + ui: + image: storm + container_name: storm_ui + command: storm ui + ports: + - 8080:8080 + depends_on: + - nimbus + links: + - nimbus + backend: build: context: ./ dockerfile: ./docker/Dockerfile - network_mode: "bridge" container_name: docker_storm image: thornycrackers/docker_storm - ports: - - "8000" volumes: - .:/usr/src/app command: /bin/bash diff --git a/docker/Dockerfile b/docker/Dockerfile index ac833c3..4465fd1 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -23,5 +23,7 @@ ENV LEIN_ROOT=/usr/src/app ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 RUN ln -s /usr/lib/jvm/java-11-openjdk-amd64/bin/storm /bin/storm +COPY ./docker/storm.yaml /root/.storm/storm.yaml + # The code is stored here WORKDIR /usr/src/app diff --git a/docker/storm.yaml b/docker/storm.yaml new file mode 100644 index 0000000..283481f --- /dev/null +++ b/docker/storm.yaml @@ -0,0 +1,9 @@ +storm.zookeeper.servers: + - "localhost" +storm.local.dir: "/tmp/storm" +nimbus.host: "localhost" +supervisor.slots.ports: + - 6700 + - 6701 + - 6702 + - 6703 diff --git a/wordcount/.gitignore b/wordcount/.gitignore index 37b28cc..959264d 100644 --- a/wordcount/.gitignore +++ b/wordcount/.gitignore @@ -1,4 +1,16 @@ -.DS_Store -_build -_resources -logs +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +.idea/ +*.log +*.log.lck +.classpath +.settings/ +*.iml +.project \ No newline at end of file diff --git a/wordcount/CONTRIBUTING.md b/wordcount/CONTRIBUTING.md new file mode 100644 index 0000000..efcad49 --- /dev/null +++ b/wordcount/CONTRIBUTING.md @@ -0,0 +1,11 @@ +# Contributing to Azure samples + +Thank you for your interest in contributing to Azure samples! + +## Ways to contribute + +You can contribute to [Azure samples](https://azure.microsoft.com/documentation/samples/) in a few different ways: + +- Submit feedback on [this sample page](https://azure.microsoft.com/documentation/samples/hdinsight-python-storm-wordcount/) whether it was helpful or not. +- Submit issues through [issue tracker](https://github.com/Azure-Samples/hdinsight-python-storm-wordcount/issues) on GitHub. We are actively monitoring the issues and improving our samples. +- If you wish to make code changes to samples, or contribute something new, please follow the [GitHub Forks / Pull requests model](https://help.github.com/articles/fork-a-repo/): Fork the sample repo, make the change and propose it back by submitting a pull request. \ No newline at end of file diff --git a/wordcount/LICENSE b/wordcount/LICENSE new file mode 100644 index 0000000..d8d98a8 --- /dev/null +++ b/wordcount/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2015 Microsoft Corporation + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/wordcount/README.md b/wordcount/README.md new file mode 100644 index 0000000..3cd7765 --- /dev/null +++ b/wordcount/README.md @@ -0,0 +1,88 @@ +--- +services: hdinsight +platforms: java,python +author: blackmist +--- +# hdinsight-python-storm-wordcount + +How to use Python components in an Apache Storm topology on HDInsight + +This topology uses the Flux framework to define a Storm topology using YAML. The components (spout and bolts) that process the data are written in Python. + +This example has been tested with HDInsight 3.6 (Storm 1.1.0). + +## Prerequisites + +* Python 2.7 or higher + +* Java JDK 1.8 or higher + +* Maven + +* (Optional) A local Storm development environment. This is only needed if you want to run the topology locally. For more information, see [Setting up a development environment](http://storm.apache.org/releases/1.0.1/Setting-up-development-environment.html). + +## How it works + +* `/resources/topology.yaml` - defines what components are in the topology and how data flows between them. + +* `/multilang/resources` - contains the Python components. + +* `/pom.xml` - dependencies and how to build the project. + +## Build the project + +From the root of the project, use the following command: + +```bash +mvn clean compile package +``` + +This command creates a `target/WordCount-1.0-SNAPSHOT.jar` file. + +## Run the topology locally + +To run the topology locally, use the following command: + +```bash +storm jar WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux -l -R /topology.yaml +``` + +Once the topology starts, it emits information to the local console similar to the following text: + +``` +24302 [Thread-25-sentence-spout-executor[4 4]] INFO o.a.s.s.ShellSpout - ShellLog pid:2436, name:sentence-spout Emiting the cow jumped over the moon +24302 [Thread-30] INFO o.a.s.t.ShellBolt - ShellLog pid:2438, name:splitter-bolt Emitting the +24302 [Thread-28] INFO o.a.s.t.ShellBolt - ShellLog pid:2437, name:counter-bolt Emitting years:160 +24302 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=the, count=599} +24303 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=seven, count=302} +24303 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=dwarfs, count=143} +24303 [Thread-25-sentence-spout-executor[4 4]] INFO o.a.s.s.ShellSpout - ShellLog pid:2436, name:sentence-spout Emiting the cow jumped over the moon +24303 [Thread-30] INFO o.a.s.t.ShellBolt - ShellLog pid:2438, name:splitter-bolt Emitting cow +^C24303 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {word=four, count=160} +``` + +Use Ctrl+c to stop the topology. + +## Run the topology on HDInsight + +1. Use the following command to copy the `WordCount-1.0-SNAPSHOT.jar` file to your Storm on HDInsight cluster: + + ```bash + scp target\WordCount-1.0-SNAPSHOT.jar sshuser@mycluster-ssh.azurehdinsight.net + ``` + + Replace `sshuser` with the SSH user for your cluster. Replace `mycluster` with the cluster name. + +2. Once the file has been uploaded, connect to the cluster using SSH and use the following command to start the topology on the cluster: + + ```bash + storm jar WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux -r -R /topology.yaml + ``` + +3. You can use the Storm UI to view the topology on the cluster. The Storm UI is located at https://mycluster.azurehdinsight.net/stormui. Replace `mycluster` with your cluster name. + +Once started, a Storm topology runs until stopped (killed.) To stop the topology, use either the `storm kill TOPOLOGYNAME` command from the command-line (SSH session to a Linux cluster,) or by using the Storm UI, select the topology, and then select the __Kill__ button. + +## Project code of conduct + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. diff --git a/wordcount/config.json b/wordcount/config.json deleted file mode 100644 index 778cb51..0000000 --- a/wordcount/config.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "serializer": "json", - "topology_specs": "topologies/", - "virtualenv_specs": "virtualenvs/", - "envs": { - "prod": { - "user": "", - "ssh_password": "", - "nimbus": "", - "workers": [], - "log": { - "path": "", - "max_bytes": 1000000, - "backup_count": 10, - "level": "info" - }, - "virtualenv_root": "" - } - } -} diff --git a/wordcount/fabfile.py b/wordcount/fabfile.py deleted file mode 100644 index 8c89cbc..0000000 --- a/wordcount/fabfile.py +++ /dev/null @@ -1,11 +0,0 @@ -def pre_submit(topology_name, env_name, env_config, options): - """Override this function to perform custom actions prior to topology - submission. No SSH tunnels will be active when this function is called.""" - pass - - -def post_submit(topo_name, env_name, env_config, options): - """Override this function to perform custom actions after topology - submission. Note that the SSH tunnel to Nimbus will still be active - when this function is called.""" - pass diff --git a/wordcount/multilang/resources/countbolt.py b/wordcount/multilang/resources/countbolt.py new file mode 100644 index 0000000..4aebc19 --- /dev/null +++ b/wordcount/multilang/resources/countbolt.py @@ -0,0 +1,26 @@ +import storm +# Counter is a nice way to count things, +# but it is a Python 2.7 thing +from collections import Counter + +class CountBolt(storm.BasicBolt): + # Initialize this instance + def initialize(self, conf, context): + self._conf = conf + self._context = context + # Create a new counter for this instance + self._counter = Counter() + storm.logInfo("Counter bolt instance starting...") + + def process(self, tup): + # Get the word from the inbound tuple + word = tup.values[0] + # Increment the counter + self._counter[word] +=1 + count = self._counter[word] + storm.logInfo("Emitting %s:%s" % (word, count)) + # Emit the word and count + storm.emit([word, count]) + +# Start the bolt when it's invoked +CountBolt().run() diff --git a/wordcount/multilang/resources/sentencespout.py b/wordcount/multilang/resources/sentencespout.py new file mode 100644 index 0000000..a75f37d --- /dev/null +++ b/wordcount/multilang/resources/sentencespout.py @@ -0,0 +1,28 @@ +import storm +import random +# Define some sentences +SENTENCES = """ +the cow jumped over the moon +an apple a day keeps the doctor away +four score and seven years ago +snow white and the seven dwarfs +i am at two with nature +""".strip().split('\n') + +class SentenceSpout(storm.Spout): + # Not much to do here for such a basic spout + def initialize(self, conf, context): + self._conf = conf + self._context = context + + storm.logInfo("Spout instance starting...") + + # Process the next tuple + def nextTuple(self): + # Emit a random sentence + sentence = random.choice(SENTENCES) + storm.logInfo("Emiting %s" % sentence) + storm.emit([sentence]) + +# Start the spout when it's invoked +SentenceSpout().run() diff --git a/wordcount/multilang/resources/splitbolt.py b/wordcount/multilang/resources/splitbolt.py new file mode 100644 index 0000000..b46a901 --- /dev/null +++ b/wordcount/multilang/resources/splitbolt.py @@ -0,0 +1,21 @@ +import storm + +class SplitBolt(storm.BasicBolt): + # There's nothing to initialize here, + # since this is just a split and emit + # Initialize this instance + def initialize(self, conf, context): + self._conf = conf + self._context = context + storm.logInfo("Split bolt instance starting...") + + def process(self, tup): + # Split the inbound sentence at spaces + words = tup.values[0].split() + # Loop over words and emit + for word in words: + storm.logInfo("Emitting %s" % word) + storm.emit([word]) + +# Start the bolt when it's invoked +SplitBolt().run() diff --git a/wordcount/pom.xml b/wordcount/pom.xml new file mode 100644 index 0000000..941b7a9 --- /dev/null +++ b/wordcount/pom.xml @@ -0,0 +1,128 @@ + + 4.0.0 + com.microsoft.example + WordCount + jar + 1.0-SNAPSHOT + WordCount + http://maven.apache.org + + UTF-8 + + 2.2.0 + + + + org.apache.storm + storm-core + ${storm.version} + + provided + + + org.apache.storm + flux-core + ${storm.version} + + + org.apache.storm + flux-wrappers + ${storm.version} + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + + + + + + org.apache.storm.flux.Flux + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.5.0 + + + + exec + + + + + java + true + false + compile + + org.apache.storm.flux.Flux + + + + + + ${basedir}/resources + false + + + log4j2.xml + + topology.yaml + + + + + ${basedir}/multilang + false + + + + diff --git a/wordcount/project.clj b/wordcount/project.clj deleted file mode 100644 index 4b08880..0000000 --- a/wordcount/project.clj +++ /dev/null @@ -1,13 +0,0 @@ -(defproject wordcount "0.0.1-SNAPSHOT" - :resource-paths ["_resources"] - :target-path "_build" - :min-lein-version "2.0.0" - :jvm-opts ["-client"] - :dependencies [[org.apache.storm/storm-core "2.3.0"] - [org.apache.storm/flux-core "2.3.0"]] - :jar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"] - :uberjar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"] - ) -(require 'cemerick.pomegranate.aether) -(cemerick.pomegranate.aether/register-wagon-factory! - "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) diff --git a/wordcount/resources/log4j2.xml b/wordcount/resources/log4j2.xml new file mode 100644 index 0000000..d2b2981 --- /dev/null +++ b/wordcount/resources/log4j2.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/wordcount/resources/topology.yaml b/wordcount/resources/topology.yaml new file mode 100644 index 0000000..725f3be --- /dev/null +++ b/wordcount/resources/topology.yaml @@ -0,0 +1,73 @@ +# topology definition + +# name to be used when submitting. This is what shows up... +# in the Storm UI/storm command-line tool as the topology name +# when submitted to Storm +name: "wordcount" + +# Topology configuration +config: + # Hint for the number of workers to create + topology.workers: 1 + +# Spout definitions +spouts: + - id: "sentence-spout" + className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout" + constructorArgs: + # Command line + - ["python", "sentencespout.py"] + # Output field(s) + - ["sentence"] + # parallelism hint + parallelism: 1 + +# Bolt definitions +bolts: + - id: "splitter-bolt" + className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" + constructorArgs: + # Command line + - ["python", "splitbolt.py"] + # Output field(s) + - ["word"] + parallelism: 1 + + - id: "counter-bolt" + className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" + constructorArgs: + # Command line + - ["python", "countbolt.py"] + # Output field(s) + - ["word","count"] + parallelism: 1 + + # Logging + - id: "log" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + +# Stream definitions +streams: + - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.) + # The stream emitter + from: "sentence-spout" + # The stream consumer + to: "splitter-bolt" + # Grouping type + grouping: + type: SHUFFLE + + - name: "Splitter -> Counter" + from: "splitter-bolt" + to: "counter-bolt" + grouping: + type: FIELDS + # field(s) to group on + args: ["word"] + + - name: "Counter -> Log" + from: "counter-bolt" + to: "log" + grouping: + type: SHUFFLE \ No newline at end of file diff --git a/wordcount/src/bolts/__init__.py b/wordcount/src/bolts/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/wordcount/src/bolts/wordcount.py b/wordcount/src/bolts/wordcount.py deleted file mode 100644 index 85fbfb7..0000000 --- a/wordcount/src/bolts/wordcount.py +++ /dev/null @@ -1,26 +0,0 @@ -import os -from collections import Counter - -from streamparse import Bolt - - -class WordCountBolt(Bolt): - outputs = ["word", "count"] - - def initialize(self, conf, ctx): - self.counter = Counter() - self.pid = os.getpid() - self.total = 0 - - def _increment(self, word, inc_by): - self.counter[word] += inc_by - self.total += inc_by - - def process(self, tup): - word = tup.values[0] - self._increment(word, 10 if word == "dog" else 1) - if self.total % 1000 == 0: - self.logger.info( - f"counted [{self.total:,}] words [pid={self.pid}]" - ) - self.emit([word, self.counter[word]]) diff --git a/wordcount/src/spouts/__init__.py b/wordcount/src/spouts/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/wordcount/src/spouts/words.py b/wordcount/src/spouts/words.py deleted file mode 100644 index 6ba88c1..0000000 --- a/wordcount/src/spouts/words.py +++ /dev/null @@ -1,14 +0,0 @@ -from itertools import cycle - -from streamparse import Spout - - -class WordSpout(Spout): - outputs = ["word"] - - def initialize(self, stormconf, context): - self.words = cycle(["dog", "cat", "zebra", "elephant"]) - - def next_tuple(self): - word = next(self.words) - self.emit([word]) diff --git a/wordcount/topologies/wordcount.py b/wordcount/topologies/wordcount.py deleted file mode 100644 index d66e1b3..0000000 --- a/wordcount/topologies/wordcount.py +++ /dev/null @@ -1,13 +0,0 @@ -""" -Word count topology -""" - -from streamparse import Grouping, Topology - -from bolts.wordcount import WordCountBolt -from spouts.words import WordSpout - - -class WordCount(Topology): - word_spout = WordSpout.spec() - count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, par=2) diff --git a/wordcount/virtualenvs/wordcount.txt b/wordcount/virtualenvs/wordcount.txt deleted file mode 100644 index 9229811..0000000 --- a/wordcount/virtualenvs/wordcount.txt +++ /dev/null @@ -1 +0,0 @@ -streamparse # always required for streamparse projects -- cgit v1.2.3