欢迎转载,转载请注明出处,徽沪一郎。
TridentTopology是storm提供的高层使用接口,常见的一些SQL中的操作在tridenttopology提供的api中都有类似的影射。关于TridentTopology的使用及运行原理,当前进行详细分析的文章不多。
从TridentTopology到vanilla topology(普通的topology)由三个层次组成:
- 面向最终用户的概念stream, operation
- 利用planner将tridenttopology转换成vanilla topology
- 执行vanilla topology
本文尝试TridentTopology是如何先一步步转换成普通的storm Topology(即vanila topology), 转换后的topology的执行中有哪些区别?
概述
从TridentTopology到基本的Topology有三层,下图给出一个全局的视图。
创建TridentTopology
下面的代码摘自StormStarter中的TridentWordCount.java
TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(16); return topology.build();
上述代码的newStream一行,分两大部分,一是使用newStream来创建一个stream对象,然后针对该Stream进行各种操作,each/shuffle/persistentAggregate等就是各种operation.
用户在使用TridentTopology的时候,只需要熟悉Stream和TridentTopology中的API函数即可。
转换TridentTopology为Vanilla Topology
上一节创建了Stream,但是如何将其与原有的Spout及Bolt联系起来呢?问题的关键就在TridentTopology::build函数和TridentTopologyBuilder::buildTopology
TridentTopology::build
newStream及其后的函数调用创建了一个含有三大类节点的List,利用该List创建了一个有向非循环图(DAG)。这三类节点分别是operation, partition, spout,在build函数将节点分类分别加入到boltNodes或spoutNodes,注意此处的spout或bolt不能等同于普通的spout和bolt.
TridentTopologyBuilder::buildTopology
利用在build函数中创建的boltNodes,spoutNodes及生成的graph来创建vanilla topology所需要的bolt及spout.
在buildTopology中会看到类似的代码片段。
builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
for(String b: c.committerBatches) { specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID); } BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
最终生成的普通Topology,与普通Topology中的Spout相对应的是MasterBatchCoordinator,而在创建TridentTopology使用的spout则成了Bolt,使用于Stream上的各种Operation也存在于多个普通Bolt中。
TridentTopology的执行
TridentTopology被转换为普通的Topology(vanilla Topology)之后提交到nimbus,它的具体执行过程有什么不同呢?
主要有几点:
- MasterBatchCoordinator通过Batch_stream_id来发送通知给TridentSpoutExecutor
- TridentSpoutExecutor收到通知发送成批的tuple给下一跳的Bolt
- 下一跳的Bolt收到tuple之后,使用TridentBoltExecutor来进行处理
- TridentBoltExecutor调用SubtopologyBolt::execute
- InitialReceiver::execute被调用
- TridentProcessor::execute被调用
MasterBatchCoordinator收到ack之后,会发送success消息给Spout
MasterBatchCoordinator在commit的时候,会发送commit消息给Spout,让Spout将缓存的消息删除
相关推荐
Apache Spark源码走读之5 -- DStream处理的容错性分析
Apache Spark源码走读之3 -- Task运行期之函数调用关系分析
Apache Spark源码走读之2 -- Job的提交与运行.pdf
Apache Spark源码走读之4 -- DStream实时流数据处理
Storm源码走读笔记 写的非常详细的代码走查笔记,对于想阅读源代码提高编程能力的同学非常有用哦。
在项目开发过程中,因代码质量不过关,进行了代码走读,事后把意见留存,组织成word文档。在以后遇到相同的问题时可以快速解决。
nova-compute源码分析
Apache Spark源码走读之如何进行代码跟读
详细的Hadoop源码剖析电子版。书中引用了丰富的架构图片和流程来解析结合hadoop框架原理,推荐深度运维和二次开发者阅读
IDEA走读Java源码坏境搭建 新建一个普通java项目(如:java8-source) 创建package(tech.sqlclub.java_source)存放java源码 java源码在$JAVA_HOME/src.zip 解压就行,mac用户JAVA_HOME查看如下图: 通过Debug,撸...
走读MINA2.0源码的笔记,并有流程图做说明
由于项目需要,最近深入细致的了解了ceph的读写流程,并且跟项目组做了一个代码串讲。附上串讲用的ppt。 个人认为,理解了ceph的io流水线模型,是理解整个io读写流程的关键。
1. 初始化探测速率表 2. 初始化探测的相关参数 3. Re:【python】获取高德地图省市区县列 2. 【JavaEE】Springmvc+Spring整
介绍完速率表,剩下的,就按照和minstrel同样的思路来分析,先来看注册rate_control_ops的结构体:static struct rate_con
代码走读记录表模板代码走读记录表模板代码走读记录表模板
集合源码分析 JAVA: 基本语法 static 修饰变量 方法 静态块(初始化块 构造函数 ) 静态内部类() 静态导包 final() transient() foreach循环原理() volatile底层实现() equals和hashcode(, ) string,stringbuffer和...
DPDKL2fwd代码走读报告(代码流程分析).pdf
ffmpeg播放m3u8网络视频文件的流程,详细描述了整个播放过程的代码流程。
该资源用于搭建最基本的SSM框架。下载后直接导入工程,并可执行resource中的init.sql插入测试数据即可验证。