Topologies¶
Clojure Quick Reference Guide¶
Topologies in streamparse are defined using Clojure. Here is a quick overview so you don’t get lost.
- Function definitions
(defn fn-name [options] expressions)
defines a function calledfn-name
that takesoptions
as an argument and evaluates each of theexpressions
, treating the last evaluated expression as the return value for a function.- Keyword arguments
- In Clojure, keyword arguments are specified using paired-up positional
arguments. Thus
:p 2
is thep
keyword set to value2
. - List
[val1 val2 ... valN]
defines a list of N values.- Map
{"key-1" val1 "key-2" val2 ... "key-N" valN}
is a mapping of key-value pairs.- Comments
- Anything after
;;
is a line comment.
For Python programmers, Clojure can be a little tricky in that whitespace is
not significant, and ,
is treated as whitespace. This means [val1 val2]
and [val1, val2]
are identical lists. Function definitions can similarly
take up multiple lines.
(defn fn-name [options]
expression1
expression2
;; ...
expressionN
;; the value of expressionN is the returned value
)
Topology Files¶
A topology file describes your topology in terms of Directed Acyclic Graph (DAC) of Storm components, namely bolts and spouts. It uses the Clojure DSL for this, along with some utility functions streamparse provides.
Topology files are located in topologies
in your streamparse project folder.
There can be any number of topology files for your project in this directory.
- topologies/my-topology.clj
- topologies/my-other-topology.clj
- topologies/my-third-topology.clj
So on and so forth.
A sample my-topology.clj
, would start off importing the streamparse
Clojure DSL functions.
(ns my-topology
(:use [streamparse.specs])
(:gen-class))
Notice the my-topology
matches the name of the file. The next line is the
import of the streamparse utility functions.
You could optionally avoid all of the streamparse-provided helper functions and import your own functions or the Clojure DSL for Storm directly.
(ns my-topology
(:use [backtype.storm.clojure])
(:gen-class))
In the next part of the file, we setup a topology definition, also named
my-topology
(matching the ns
line and filename). This definition is
actually a Clojure function that takes the topology options as a single map
argument. This function returns a list of 2 maps – a spout map, and a bolt map.
These two maps define the DAG that is your topology.
(defn my-topology [options]
[
;; spout configuration
{"my-python-spout" (python-spout-spec
;; topology options passed in
options
;; python class to run
"spouts.myspout.MySpout"
;; output specification, what named fields will this spout emit?
["data"]
;; configuration parameters, can specify multiple or none at all
)
}
;; bolt configuration
{"my-python-bolt" (python-bolt-spec
;; topology options pased in
options
;; inputs, where does this bolt receive its tuples from?
{"my-python-spout" :shuffle}
;; python class to run
"bolts.mybolt.MyBolt"
;; output specification, what named fields will this spout emit?
["data" "date"]
;; configuration parameters, can specify multiple or none at all
:p 2
)
}
]
)
Shell Spouts and Bolts¶
The Clojure DSL
provides the shell-bolt-spec
and shell-spout-spec
functions to handle bolts in non-JVM languages.
The shell-spout-spec
takes at least 2 arguments:
- The command line program to run (as a list of arguments)
- A list of the named fields the spout will output
- Any optional keyword arguments
"my-shell-spout" (shell-spout-spec
;; Command to run
["python" "spout.py"]
;; output specification, what named fields will this spout emit?
["data"]
;; configuration parameters, can specify multiple or none at all
:p 2
)
The shell-bolt-spec
takes at least 3 arguments:
- A map of the input spouts and their groupings
- The command line program to run (as a list of arguments)
- A list of the named fields the spout will output
- Any optional keyword arguments
"my-shell-bolt" (shell-bolt-spec
;; input spouts and their groupings
{"my-shell-spout" :shuffle}
;; Command to run
["bash" "mybolt.sh"]
;; output specification, what named fields will this spout emit?
["data"]
;; configuration parameters, can specify multiple or none at all
:p 2
)
Python Spouts and Bolts¶
The example topology above, and the sparse quickstart wordcount
project
utilizes the python-spout-spec
and python-bolt-spec
provided by the
streamparse.specs
import statement.
(python-spout-spec ...)
and (python-bolt-spec ...)
are just convenience
functions provided by streamparse for creating topology components. They are
simply wrappers around (shell-spout-spec ...)
and (shell-bolt-spec ...)
.
The python-spout-spec
takes at least 3 arguments:
options
- the topology options array passed in- The full path to the class to run.
spouts.myspout.MySpout
is actually theMySpout
class insrc/spouts/myspout.py
- A list of the named fields the spout will output
- Any optional keyword arguments, such as parallelism
:p 2
The python-bolt-spec
takes at least 4 arguments:
options
- the topology options array passed in- A map of the input spouts and their groupings (See below)
- The full path to the class to run.
bolts.mybolt.MyBolt
is actually theMyBolt
class insrc/bolts/mybolt.py
- A list of the named fields the spout will output
- Any optional keyword arguments, such as parallelism
:p 2
Parallelism is further discussed in Parallelism and Workers.
Groupings¶
Storm offers comprehensive options for stream groupings, but you will most commonly use a shuffle or fields grouping:
- Shuffle grouping: Tuples are randomly distributed across the bolt’s tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
- Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task, but tuples with different “user-id”’s may go to different tasks.
Streams¶
Topologies support multiple streams when routing tuples between components. The
emit()
method takes an optional
stream argument to specify the stream ID. For example:
self.emit([term, timestamp, lookup_result], stream='index')
self.emit([term, timestamp, lookup_result], stream='topic')
The topology definition can include these stream IDs to route between components, and a component can specify more than one stream. Example with the Clojure DSL:
"lookup-bolt" (python-bolt-spec
options
{"search-bolt" :shuffle}
"birding.bolt.TwitterLookupBolt"
{"index" ["url" "timestamp" "search_result"]
"topic" ["url" "timestamp" "search_result"]}
:p 2
)
"elasticsearch-index-bolt" (python-bolt-spec
options
{["lookup-bolt" "index"] ["url" "timestamp" "search_result"]}
"birding.bolt.ElasticsearchIndexBolt"
[]
:p 1
)
"result-topic-bolt" (python-bolt-spec
options
{["lookup-bolt" "index"] ["url" "timestamp" "search_result"]
["lookup-bolt" "topic"] ["url" "timestamp" "search_result"]}
"birding.bolt.ResultTopicBolt"
[]
:p 1
)
Storm sets a default stream ID of "default"
, as described in its doc on
Streams:
Every stream is given an id when declared. Since single-stream spouts and bolts are so common, ... the stream is given the default id of “default”.
Running Topologies¶
What Streamparse Does¶
When you run a topology either locally or by submitting to a cluster, streamparse will
- Compile your .clj topology file
- Execute the Clojure code by invoking your topology function, passing it the
options
map - Get the DAG defined by the topology and pass it into the Storm Java interop classes like StormSubmitter and LocalCluster
- Run/submit your topology
If you invoked streamparse with sparse run
, your code is executed directly
from the src/
directory.
If you submitted to a cluster with sparse submit
, streamparse uses lein
to compile the src
directory into a jar file, which is run on the
cluster. Lein uses the project.clj
file located in the root of your
project. This file is a standard lein project file and can be customized
according to your needs.
Dealing With Errors¶
When detecting an error, bolt code can call its
fail()
method in order to have Storm call
the respective spout’s fail()
method. Known error/failure cases result in explicit callbacks to the spout
using this approach.
Exceptions which propagate without being caught will cause the component to
crash. On sparse run
, the entire topology will stop execution. On a running
cluster (i.e. sparse submit
), Storm will auto-restart the crashed component
and the spout will receive a fail()
call.
If the spout’s fail handling logic is to hold back the tuple and not re-emit it, then things will keep going. If it re-emits it, then it may crash that component again. Whether the topology is tolerant of the failure depends on how you implement failure handling in your spout.
Common approaches are to:
- Append errant tuples to some sort of error log or queue for manual inspection later, while letting processing continue otherwise.
- Attempt 1 or 2 retries before considering the tuple a failure, if the error was likely an transient problem.
- Ignore the failed tuple, if appropriate to the application.
Parallelism and Workers¶
In general, use the :p “parallelism hint” parameter per spout and bolt in your configuration to control the number of Python processes per component.
Reference: Understanding the Parallelism of a Storm Topology
Storm parallelism entities:
- A worker process is a JVM, i.e. a Java process.
- An executor is a thread that is spawned by a worker process.
- A task performs the actual data processing. (To simplify, you can think of it as a Python callable.)
Spout and bolt specs take a :p
keyword to provide a parallelism hint to
Storm for the number of executors (threads) to use for the given spout/bolt;
for example, :p 2
is a hint to use two executors. Because streamparse
implements spouts and bolts as independent Python processes, setting :p N
results in N Python processes for the given spout/bolt.
Many streamparse applications will need only to set this parallelism hint to control the number of resulting Python processes when tuning streamparse configuration. For the underlying topology workers, streamparse sets a default of 2 workers, which are independent JVM processes for Storm. This allows a topology to continue running when one worker process dies; the other is around until the dead process restarts.
Both sparse run
and sparse sumbit
accept a -p N
command-line flag
to set the number of topology workers to N. For convenience, this flag also
sets the number of Storm’s underlying messaging reliability
acker bolts to the same N value. In the event that you need it (and you
understand Storm ackers), use the -a
and -w
command-line flags instead
of -p
to control the number of acker bolts and the number of workers,
respectively. The sparse
command does not support Storm’s rebalancing
features; use sparse submit -f -p N
to kill the running topology and
redeploy it with N workers.
Note that Storm’s underlying thread implementation, LMAX Disruptor, is designed with high-performance inter-thread messaging as a goal. Rule out Python-level issues when tuning your topology:
- bottlenecks where the number of spout and bolt processes are out of balance
- serialization/deserialization overhead of more data emitted than you need
- slow routines/callables in your code