yzprofile's Notebook

All Posts| Note| Books| About
25 Apr 2013

Storm 入门手册

VIA: https://github.com/nathanmarz/storm/wiki/Tutorial

教程

通过这个手册,你可以了解到如何去创建Storm topologies并把它们部署到一个Storm集群。主要用到了Java语言,但是为了说明Storm的多语言支持的特性也使用了Python去实现一些例子。

准备

这个手册中使用的例子来自于storm-starter工程。建议你同步下来此项目,并根据例子来学习。阅读Setting up development environmentCreating a new Storm project两篇文章去搭建你的环境。

Storm集群组件

一个Sotrm集群表面上看起来和Hadoop集群很像。在Hadoop上你运行的时”MapReduce Job”,在Storm上运行的是”topologies”。”Job”和”topologies”之间还是有很大不同的 –最关键的一点是Mapreduce job最终是会完成的,而topology将会持续的执行(直到你终止它)。

Storm集群中有两种不同的节点:Master和Worker。Master节点启动的守护进程叫做”Numbus”,这点和Hadoop的”JobTracker”很相似。Nimbus在集群众发布指令,给其他机器分配作业,并且监控失败。

每一个Worker启动的守护进程叫做”Supervisor”。Supervisor接收Nimbus分发给它的工作去启动或者停止真正的工作进程。每个工作进程执行一个topology的子集,topology是执行在多个worker进程乃至许多机器上的。

storm-cluster

Nimbus和Supervisors之间是通过Zookeeper集群来协作的,Nimbus守护进程和Supervisor守护进程是fail-fast和无状态的;所有的状态都存在Zookeeper和本地磁盘中。这意味着你可以随意kill -9 Nimbus或者Supervisors,并且当它们再次启动的时候就像什么都没有发生过一样。这样的设计保证了Storm集群的稳定性。

Topologies

为了在Storm上做实时计算,你需要创建一个”topologies”。Topology是一个计算图。Topology每个节点包含了处理逻辑,节点之间的连接表明了数据应该怎样在它们之间传输。

启动一个topology还是非常简单的。首先,你需要把你的代码打包成一个jar文件。之后,你可以用下面的命令启动它:

$ storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

这条指令带着参数arg1和arg2运行了backtype.storm.MyTopology这个类。这个类的main函数定义了toplogy并且提交到Nimbus。storm jar负责连接到Nimbus并上传你的jar文件。

因为topology只是Thrift结构的定义,并且Nimbus是一个Thrift服务,所以你可以使用任意的编程语言去创建提交topologies。之前的例子是用基于JVM的语言完成的。可以从Running topologies on a production cluster来了解关于启动和停止topologies的更多信息。

Streams

Storm里最核心的概念就是”Stream”。Stream是一个没有边界的元组序列。Storm为了将一个stream转换为另一个新的stream提供了一些分布式的并且可靠的元语。例如你可以把tweets的信息流传送到话题的流。

Storm提供的流转换的基础元语是”spouts”和”bolt”。为了运行你的应用逻辑,你必须去实现Spouts和bolt的一些接口。

Spout是streams的源头。举个例子,Spout可以从Kestrel读取一组数据并把它们当作stream发送出去。或者spout也能调用Twitter的API去获取tweets并将它们作为stream发送出去。

Bolt去处理输入的所有的stream,并且也能发出新的stream。在复杂的stream传输过程中,像从tweets中计算stream的话题趋势,需要多个阶段,从而存在多个bolt。Bolt可以通过执行,过滤,聚合,关联,读写数据库等操作来做任何事情。

Spouts和bolt被打包成一个”topology”,它是一个高度抽象的概念,你可以把它提交给Storm集群去执行。Toplogy是一个Spouts和Bolt之间传输stream的图。在这个图中的箭头表明了bolt订阅了哪些stream。当一个spout或者一个bolt发送一组数据到stream时,所有订阅这个stream的bolt都可以收到这组数据。

topology

topology中节点之间的连接表明了数据传输的流向。举个例子,如果Spout A和Bolt B相连,Spout A又和Bolt C相连,Bolt B也和Bolt C相连,每当Spout A发送数据时,它都将发送给Bolt B和BoltC,当Bolt B发送出的数据也都会发送给Bolt C。

Storm中的每个topology节点的执行都是并行的。在你的topology中,你可以指定你想要每个节点执行的并行数,Storm将会在集群内启动这么多的线程去执行。

Topology的执行不会退出,除非你手动kill掉它。Storm将会自动分配失败的任务。另外,即使在机器宕机消息被丢失的情况下,storm也会保证不会丢失数据。

数据模型

Storm用元组作为其数据模型。一个元组是一组命名的数值列表,一个元组可以表示任意的类型。Storm支持所有的原生类型,字符串,字节,数组都可以作为元组的值。如果你想使用其他的类型,你只需要去实现这个类型的序列化的方法。

Topology中的每个节点必须声明它的输出字段。举个例子,Bolt声明它发出的两个元组分别是”double”和”triple”。

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    

}

declareOutputFields函数声明了输出字段[“double”, “triple”]。剩下的函数将会在下面的章节里介绍。

一个简单的Topology

让我们看一眼一个简单的topology去探索一下它的定义和它的代码结构。我们先看下它的定义ExclamationTopology。这个例子来自storm-starter:

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

这个Topology包含了一个spout和两个bolt。Spout发送出一个单词,每个bolt在输入的字符串后面加上”!!!”。这些节点连成一条线:spout发给第一个bolt,第一个bolt再发给第二个。如果spout发出的元组是[“bob”]和p[“john”],那么第二个bolt将会输出单词[“bob!!!!!!”]和[“john!!!!!!”]。

代码里定义节点使用方法setSpoutsetBolt。这些方法用一个用户指定的id,一个处理逻辑的函数,还有一个你想启动的并发数作为参数。在这个例子里,spout给定的id是”words”,两个bolt给定的id分别是”exclaim1”和”exclaim2”。

这个对象的处理逻辑实现了IRichSpoutIRichBolt的接口。

最后一个参数是指定你想启动多少并行的实例,这个参数是可选的。它实际上是指定在集群中需要启动多少线程。如果你忽略它,Storm将会默认分配一个线程。

setBolt返回一个InputDeclarer对象,用来定义Bolt的输出。这里bolt”exclami1”声明它需要通过”shuffle grouping的方式去获取”spout”words”的所有输出,”shuffle grouping”意味着元组是以随机分发到各个bolt的任务上的。各个组件之间可以由很多不同的方式来分发这些数据,后面我们将详细介绍这点。

如果你期望bolt “exclaim2”去读所有发自spout “words”和bolt “exclaim1”的数据,你可以这么定义”exclaim2”:

builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");

这里你就可以看到可以声明多一个源作为一个Bolt的输入。

让我们深入了解下这个topology的spout和bolt的实现。Spout负责发送新的消息到topology。在这个topology中,TestWordSpout每100ms发送list[“nathan”, “mike”, “jackson”, “golda”, “bertels”]内随机的一个单词作为元组。TestWordSpoutnextTuple()实现就像下面这样:

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

你可以看到这个实现是非常的直接了当的。

ExclamationBolt在输入字符串后添加”!!!”。让我们看下它的所有实现:

public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    public void cleanup() {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
    
    public Map getComponentConfiguration() {
        return null;
    }
}

prepare方法给bolt提供了OutputCollector,它用来从bolt中发送元组。Bolt可以在任意时刻发送元组 – 在prepare, execute, 或者cleanup方法中都可以,甚至可以在另外的线程中异步发送。这个prepare的实现仅仅简单的保存了OutputCollector的实例,为了待会儿在execute方法中使用。

execute方法从bolt的输入中收到了一个元组。ExclamationBolt从元组中获取第一个字段,并且发送一个追加了”!!!”的字符串的新元组,如果你实现了一个bolt订阅了多个输入源,你可以使用Tuple#getSourceComponent方法来知道它是来自于哪个组件。

execute中还有几个东西需要介绍,输入的元组作为emit的第一参数被传进来,还有在最后一行会送一个ack。这是storm来保证不丢数据实现的API的一部分,后面我们将详细介绍。

cleanup方法在Bolt被关闭的时候调用,它清理我们打开的所有资源。这里没有保证这个方法一定会在集群中被调用:举个例子,如果机器上的任务正在爆发性的增长,那么就不能调用这个方法。cleanup方法更倾向于你使用本地模式时执行,你可以执行和结束掉很多topology来避免资源的泄漏。

declareOutputFields方法声明了ExclamationBolt会发送一个携带一个字段名为”word”的元组。

getComponentConfiguration方法允许你配置组件执行的各个方面。这是一个高端的话题,在Configuration你可以了解到更多。

cleanupgetComponentConfiguration两个方法通常时不需要在bolt中实现的。你可以通过使用基类默认的实现更加简洁的定义bolt。ExclamationBolt可以简单的继承于BaseRichBolt,就像这样:

public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

在本地模式执行ExclamationTopology

让我们看看怎么在本地模式运行ExclamationTopology,观察下它时如何工作的。

Storm由两个工作模式:本地模式和分布式。在本地模式中,Storm的执行完全是在一个进程内,用多个线程来模拟多个节点。本地模式对于topology的测试和开发来说是非常有用的。当你照着storm-starter中的例子运行时,它就启动的是本地模式,你可以看到每个组件发出的消息。你可以从Local mode了解到更多的知识。

在分布式模式下,Storm作为一个集群来工作。当你提交一个topology到master时,你也提交了所有topology需要的代码。Master将会分发你的代码并分配worker去执行你的topology。如果worker宕机了,Master将会重新分配它们到其他地方。你可以从Running topologies on a production cluster这里了解到更多关于topology在集群环境下执行的知识。

ExclamationTopology在本地模式执行的代码:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

首先,通过创建一个LocalCluster对象,定义了一个进程内集群。提交topology到这个虚拟的集群和提交到分布式集群的方法是一样的。它通过submitTopology提交了一个topology到LocaoCluster,它有三个参数,一个是执行的topology的名字,还有它的配置,以及topology的实例。 这个名字是用来让你kill掉topology时使用的,topology永远不会停止执行,直到你杀死它。

这个配置时用来调整topology的各个方面的。下面两个配置是经常被用到的:

  1. TOPOLOGY_WORKERS (使用setNumWorkers来设置) 指定集群分配多少进程来执行topology。每个组件都将会以多线程模式执行。给组件分配线程的数目可以通过setBoltsetSpout方法来配置。每个worker进程包含了多个线程,具体点来说,你可以在配置中设置300个线程,并指定使用50个worker进程。那么你的每个worker进程将会执行6个线程,它们可能会属于不同的组件。你可以调整这些数目来提高性能。

  2. TOPOLOGY_DEBUG (使用setDebug来设置) 当被设置喂true时,意味着storm将会记录所有发送的消息。这个对本地模式调试时非常有用,但是你不应该在集群模式中使用它。

你还可以对topology设置更多的配置,不同配置的详细信息可以从the Javadoc for Config了解到。

你可以从这里Creating a new Storm project学习更多的关于开发环境部署以便可以让你在eclipse中执行本地模式。

Stream分组

stream的分组指名了两个组件间的数据流通是以什么样的方式进行的。需要注意的一点是,spout和bolt是并行执行在整个集群上的多个任务。从任务层面我们看它们之间的关系,就像下面这样:

topology-tasks

有一个Bolt A向Bolt B发送一个元组的任务,它应该将元组发给哪个任务?

stream分组描述了任务之间元组发送的方式。在我们深入了解不同stream分组前,让我们先看一下storm-starter中的另外一个topology。WordCountTopology从spout中读取一句话,输出stream到WordCountBolt,来统计句子中单词总共出现的次数:

TopologyBuilder builder = new TopologyBuilder();
        
builder.setSpout("sentences", new RandomSentenceSpout(), 5);        
builder.setBolt("split", new SplitSentence(), 8)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
        .fieldsGrouping("split", new Fields("word"));

SplitSentence把它收到的巨资拆分成单词发送出去,WordCount在内存中构造了一个单词和个数的对照表来统计每个单词出现的次数。每当WordCount收到一个单词,它都会去更新它的状态并发送出一个新的单词计数。

这里在stream分钟上有一点点的不同。

最简单的分组类型被叫做”shuffle 分组”,它随机的分发元组到各个任务上。它被用在WordCountTopologyRandomSentenceSpout发送给SplitSentence bolt的过程里。它可以有效的把元组平均分发给SplitSentence任务。

一个有趣的分组类型是”fields分组”。在SplitSentence bolt和WordCount bolt间使用的就是Fields分组。它让相同的字段总是发送到相同的任务上去。否则不同的任务将会看到相同的单词,它们会由于没有收到完整的信息而发送出一个错误的结果。Fields分组使用字段的子集分组stream。相同字段值将会发送给相同的任务。WordCount使用Fields分组订阅了SplitSentence的输出,并且使用了”word”字段。相同的单词将会总是发送到相同的任务上去出去,从而是bolt得到一个正确的输出。

Fields分组是实现Stream join和Stream aggregations的基础。Fields分组的机制是给予mod hash的。

你还可以在Concepts这里找到其他stream分组的介绍。

使用其他语言定义Bolt

Bolt可以使用任意语言来定义,使用其他语言写的Bolt将作为子进程执行,Storm通过标准输入输出使用JSON格式的消息和它们进行通讯。通信协议只需要100行左右的适配器库,并且storm已经提供了ruby,python和fancy三种语言的库。

这个是WordCountTopologySplitSentence的定义:

public static class SplitSentence extends ShellBolt implements IRichBolt {
    public SplitSentence() {
        super("python", "splitsentence.py");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

SplitSentence重写了ShellBolt方法,还声明了它是用python执行splitsentence.py的。下面是splitsentence.py的实现:

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

你可以从Using non-JVM languages with Storm了解到更多关于使用其他语言实现spout和bolt的相关信息。

可靠的消息处理

文章的开头我们跳过了关于发送元组的部分内容。你可以从Guaranteeing message processing这里了解到Storm是怎么去保证每条消息都会被处理的,以及作为用户应该怎样利用Storm的高可靠性。

Topology事务

Storm保证了每个消息至少会在topology内传输一次。我们可能会问:”如果在Storm上做计数,你怎么去保证它不会重复统计呢?”。Storm有事务性的topology,它可以保证准确的一次消息通知。从这儿可以了解更多。

分布式RPC

这片手册讲解了Storm基础的流式处理。基于Storm还可以做更多的事情。RPC就是其中一个最有意思的应用。点击这儿可以了解更多关于RPC的知识。

结束

手册给出了一个关于开发,测试,部署storm topology的概览。剩下的文档将会带你去更深入的了解Storm使用中的各个方面。