05 October 2015

Storm支持的三种语义:

  1. 至少一次
  2. 至多一次
  3. 有且仅有一次

至少一次语义的Topology写法


参考资料:Storm消息的可靠性保障 Storm提供了Acker的机制来保证数据至少被处理一次,是由编程人员决定是否使用这一特性,要使用这一特性需要:

  • 在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误。_collector.emit(new Values("field1", "field2", 3) , msgId);
  • 在Bolt emit时进行锚定。_collector.emit(tuple, new Values(word));

Spout例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class RandomSentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector _collector;
    private Random _rand;
    private ConcurrentHashMap<UUID, Values> _pending;

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        _collector = spoutOutputCollector;
        _rand = new Random();
        _pending = new ConcurrentHashMap<UUID, Values>();
    }

    public void nextTuple() {
        Utils.sleep(1000);
        String[] sentences = new String[] {
                "I write php",
                "I learning java",
                "I want to learn swool and tfs"
        };
        String sentence = sentences[_rand.nextInt(sentences.length)];
        Values v = new Values(sentence);
        UUID msgId = UUID.randomUUID();
        this._pending.put(msgId, v);
        _collector.emit(v, msgId);//发射tuple时,添加msgId

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("world"));
    }

    public void ack(Object msgId) {
        this._pending.remove(msgId);//当消息被完整性地处理了

    }

    public void fail(Object msgId) {
        this._collector.emit(this._pending.get(msgId), msgId);//当消息处理失败了,重新发射,当然也可以做其他的逻辑处理

    }
}

Bolt例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class SplitSentence extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        _collector = outputCollector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for (String word : sentence.split(" "))
            _collector.emit(tuple, new Values(word));//发射tuple时进行锚定

        _collector.ack(tuple);//对处理完的tuple进行确认

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

至多一次的语义


如果只需要至多一次的语义,那么只需要在编程的时候不用Storm的Acker机制,可以通过如下方法达到:

  • 把Config.TOPOLOGY_ACKERS设置成0。在这种情况下,Storm会在Spout发射一个Tuple之后马上调用Spout的ack方法。也就是说这个Tuple树不会被跟踪。
  • 在Tuple层面去掉可靠性。 你可以在发射Tuple的时候不指定MessageID来达到不不跟踪这个Tuple的目的。
  • 如果你对于一个Tuple树里面的某一部分到底成不成功不是很关心,那么可以在Bolt发射这些Tuple的时候不锚定它们。这样这些Tuple就不在Tuple树里面,也就不会被跟踪了。

需要注意的点:每个待处理的 tuple 都必须显式地应答(ack)或者失效(fail)。因为 Storm 是使用内存来跟踪每个 tuple 的,所以,如果你不对每个 tuple 进行应答或者失效,那么负责跟踪的任务很快就会发生内存溢出。

有且仅有一次的语义


参考资料:Trident State

Trident中的三种Spout,根据Spout代码情况可判断属于哪个:

事务型Spout的特点:

  • 每个 batch 的 txid 永远不会改变。对于某个特定的 txid,batch 在执行重新处理操作时所处理的 tuple 集和它的第一次处理操作完全相同。
  • 不同 batch 中的 tuple 不会出现重复的情况(某个 tuple 只会出现在一个 batch 中,而不会同时出现在多个 batch 中)。
  • 每个 tuple 都会放入一个 batch 中(处理操作不会遗漏任何的 tuple)。

模糊型Spout的特点:

  • 不同 batch 中的 tuple 不会出现重复的情况(某个 tuple 只会出现在一个 batch 中,而不会同时出现在多个 batch 中)。
  • 每个 tuple 都会放入一个 batch 中(处理操作不会遗漏任何的 tuple)。
  • 每个 tuple 都会通过某个 batch 处理完成。不过,在 tuple 处理失败的时候,tuple 有可能继续在另一个 batch 中完成处理,而不一定是在原先的 batch 中完成处理。

非事务型Spout的特点:

  • 非事务型 spout 有可能提供一种“至多一次”的处理模型,在这种情况下 batch 处理失败后 tuple 并不会重新处理。
  • 也有可能提供一种“至少一次”的处理模型,在这种情况下可能会有多个 batch 分别处理某个 tuple。

假如你的拓扑需要计算单词数,而且你准备将计数结果存入一个 K-V 型存储系统中。这里的 key 就是单词,value 对应于单词数。如果仅仅存储计数结果是无法确定某个batch中的tuple是否已经被处理过的,例如一个batch已经处理到最后一步并成功写入到了存储系统中,正准备Ack时挂了,那么重新发射这个batch后,被完整地处理了,batch中的tuple就被处理了多次,为了解决这个问题需要State的帮助,即存储的时候做一些处理。

Trident中的三种State,Storm已经提供了多中存储系统State的实现,如MemoryMapState、MemcachedState:

事务型State

  • 特点:对于特定的 txid 永远只与同一组 tuple 相关联。
  • 实现方式:将单词、txid与计数值一起存入数据库,每次更新时,比较txid是否相同,如果不相同则更新,否则不更新。

模糊型State

  • 特点:为了配合模糊型Spout达到有且仅有一次的语义而设计。
  • 实现方式:将单词、前一个结果值(preValue)、计数值和txid一起存入数据库,每次更新时,比较txid,若不同,则记录更新为(单词, 存储的计数值作为preValue, 存储的计数值+batch计算出来的值作为计数值, batch的txid);若相同,则记录更新为(单词, preValue, preValue加上batch计算出来的值作为计数值, txid)

非事务型State

  • 特点:不能保证有且仅有一次的语义
  • 实现方式:数据库只记录了单词,计数值

不同的Spout类型与不同的State类型的组合是否支持有且仅有一次的消息处理语义:

上图中的Yes表示组合支持有且仅有一次的语义。