我的第一个Storm WordCount Topology 学习
江左梅郎2016/12/21         
1.1 Storm WordCount执行流程分析
1.1.1 分布式单词计数的流程
1)首先它需要有数据源,我们在RandomSentenceSpout中定义了一个字符串数组sentences来模拟数据源。
2)这里我们将字符串数组中的每句话作为一个tuple发射。
3)然后,SplitSentenceBlot接收RandomSentenceSpout发射的tuple,它将每句话分割成每个单词,并将每个单词作为tuple发射。
4)接着,WordCountBolt接收SplitSentenceBlot发送的tuple,它将接收到的每一个单词统计计数,并将 作为tuple发射。
5)最后,ReportBolt接收WordCountBolt发送的tuple,将统计的结果存入HashMap中,并打印出结果。
1.1.2 Topology的组成类
ISpout、IComponent、IBolt三个接口定义了一些最基本的方法,BaseRichSpout、BaseRichBolt是接口的实现类,自定义的Spout与Bolt通过继承实现类来完成工作。
1.2 Storm WordCount具体代码分析
1.2.1 在RandomSentenceSpout中定义了一个字符串数组来模拟数据源
字符串数组中的每句话作为一个tuple发射。
RandomSentenceSpout.java 代码如下所示:
package storm.wordcount; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.Random; /* * 在RandomSentenceSpout中定义了一个字符串数组sentences来模拟数据源。字符串数组中的每句话作为一个tuple发射。 */ public class RandomSentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; SpoutOutputCollector _collector;// 用来向其他Spout发射tuple Random _rand;// 用来随机发射一个语句 // open函数,在ISpout接口中定义,所有的Spout组件在初始化时调用这个方法。在open()中初始化了发射器。 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector;// 初始化 _rand = new Random();// 初始化 } // nextTuple()是所有Spout的核心方法。Storm通过调用这个方法向collector发射tuple。 @Override public void nextTuple() { // 每次发射其中一个字符串,阻塞1000ms Utils.sleep(1000); //定义了待发射的数据源(my love)。Spout从该字符串数组一次取一个字符串生成tuple进行发射。 String[] sentences = new String[] { "An empty street", "An empty house", "A hole inside my heart", "I'm all alone", "The rooms are getting smaller", "I wonder how", "I wonder why", "I wonder where they are", "The days we had", "The songs we sang together", "Oh yeah", "And oh my love", "I'm holding on forever", "Reaching for a love that seems so far", "So i say a little prayer", "And hope my dreams will take me there", "Where the skies are blue to see you once again , my love", "Over seas and coast to coast", "To find a place i love the most", "Where the fields are green to see you once again , my love", "I try to read", "I go to work", "I'm laughing with my friends", "But i can't stop to keep myself from thinking", "Oh no I wonder how", "I wonder why", "I wonder where they are", "The days we had", "The songs we sang together", "Oh yeah And oh my love", "I'm holding on forever" }; // 生成随机的语句 String sentence = sentences[_rand.nextInt(sentences.length)]; // 通过emit方法将构造好的tuple发送出去 _collector.emit(new Values(sentence)); } //declareOutputFields函数标记了该Spout发射的tuple的(字段值)键值,这里任意取值为sentence //游的Bolt可以通过该键值来接收它发出的tuple @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } }
package storm.wordcount; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * SplitSentenceBlot接收RandomSentenceSpout发射的tuple, * 它将每句话分割成每个单词,并将每个单词作为tuple发射。 */ public class SplitSentenceBlot extends BaseBasicBolt { private static final long serialVersionUID = 1L; public void execute(Tuple input, BasicOutputCollector collector) { //通过sentence字段接收从RandomSentenceSpout的发射器发射过来的tuple String sentence = input.getStringByField("sentence"); //将接受过来的句子sentence,解析为单词数组 String[] words = sentence.split("\\s+"); for(String word : words){ //将每个单词构造成tuple并发送给下一个Spout collector.emit(new Values(word)); } } //定义SplitSentenceBolt发送的tuple的字段("键值")为 word @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
1.2.3 WordCountBolt接收SplitSentenceBlot发送的tuple
它将接收到的每一个单词统计计数并将 <单词:出现次数> 作为tuple发射。
WordCountBolt.java 代码如下所示:
package storm.wordcount; import java.util.HashMap; import java.util.Map; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * * WordCountBolt接收SplitSentenceBlot发送的tuple,它将接收到的每一个单词统计计数 * 并将 <单词:出现次数> 作为tuple发射 * */ public class WordCountBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; //统计每个单词出现的次数,放到HashMap中保存起来 private Map counts = new HashMap(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //接收从SplitSentenceBlot的发射器发射过来的tuple String word = tuple.getStringByField("word"); Integer count = counts.get(word); //如果HashMap中没有word这个单词 if (count == null) count = 0; count++; //更新该单词在HashMap中的统计次数 counts.put(word, count); System.out.println(word+","+count); //第一个元素的键为 "word",值为该单词(a string),第二个键为 "count",值为单词的计数 collector.emit(new Values(word, count)); } //定义WordCountBolt发送的tuple的字段为 word和count @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
1.2.4 ReportBolt接收WordCountBolt发送的tuple
将统计的结果存入HashMap中,并打印出结果。
ReportBolt.java 代码如下所示:
package storm.wordcount; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; /** * * ReportBolt接收WordCountBolt发送的tuple, * 将统计的结果存入HashMap中,并打印出结果。 * */ public class ReportBolt extends BaseRichBolt{ private static final long serialVersionUID = 4921144902730095910L; //定义HashMap存储单词统计个数 private HashMap counts = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub //初始化HashMap this.counts = new HashMap(); } public void execute(Tuple input) { // TODO Auto-generated method stub //获取单词名称 String word = input.getStringByField("word"); //获取单词个数 Integer count = input.getIntegerByField("count"); //使用HashMap存储单词和单词数 this.counts.put(word, count); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub //不需要发出任何数据流 } //Topology在storm集群中运行时,cleanup方法是不可靠的,并不能保证它一定会执行 public void cleanup(){ System.out.println("------ print counts for my love------"); Listkeys = new ArrayList(); //将HashMap中所有的单词都添加到一个集合里 keys.addAll(counts.keySet()); //对键(单词)进行排序 Collections.sort(keys); //输出排好序的每个单词的出现次数 for(String key : keys) System.out.println(key + " : " + this.counts.get(key)); } }
1.2.5 构建Wordcount Topology
WordCountTopology.java 代码如下所示:
package storm.wordcount; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; /** * * 构建Wordcount Topology * */ public class WordCountTopology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt"; private static final String TOPOLOGY_NAME = "wordcount-topology"; public static void main(String[] args) throws Exception { // 构造一个RandomSentenceSpout对象 RandomSentenceSpout sentenceSpout = new RandomSentenceSpout(); // 构造一个SplitSentenceBlot对象 SplitSentenceBlot splitBolt = new SplitSentenceBlot(); // 构造一个WordCountBolt对象 WordCountBolt countBolt = new WordCountBolt(); // 构造一个ReportBolt对象 ReportBolt reportBolt = new ReportBolt(); TopologyBuilder builder = new TopologyBuilder(); // 设置sentenceSpout,并行度为5 builder.setSpout(SENTENCE_SPOUT_ID, sentenceSpout, 5); // 设置splitBolt,并行度为8 builder.setBolt(SPLIT_BOLT_ID, splitBolt, 8).shuffleGrouping( SENTENCE_SPOUT_ID); // 设置countBolt,并行度为12 builder.setBolt(COUNT_BOLT_ID, countBolt, 12).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word")); // 设置reportBolt builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping( COUNT_BOLT_ID); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { // 提交集群运行 conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { // 本地测试运行 conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } } }
我们可以构建一个maven 项目,将上述storm Wordcount代码复制到maven项目中。 运行Wordcount需要我们在pom.xml文件中引入storm核心包storm-core。
4.0.0com.dajiangtai.mystormmystorm1.0-SNAPSHOTjartesthttp://maven.apache.orgUTF-8 junitjunit3.8.1testorg.slf4jslf4j-api1.6.4org.slf4jslf4j-log4j121.6.1log4jlog4j1.2.17org.apache.stormstorm-core0.9.6providedorg.apache.maven.pluginsmaven-shade-plugin2.4.1packageshadecom.dajiangtaifalseorg.apache.maven.pluginsmaven-compiler-plugin3.31.71.7
本地测试运行结果如下所示:
no,1 I,1 how,1 Oh,1 wonder,1 To,1 find,1 i,1 love,1 the,1 a,1 place,1 most,1 And,1 oh,1 Oh,2 my,1 yeah,1 love,2 And,2 hope,1 take,1 me,1 my,2 dreams,1 will,1 there,1 The,1 songs,1 sang,1 we,1 together,1 try,1 to,1 I,2 read,1 I,3 why,1 wonder,2 for,1 Reaching,1 a,2 love,3 that,1 seems,1 so,1 far,1 And,3 oh,2 my,3 love,4 Over,1 and,1 coast,1 seas,1 coast,2 to,2 I'm,1 holding,1 on,1 forever,1 I,4 they,1 wonder,3 where,1 are,1 Where,1 the,2 skies,1 are,2 love,5 to,3 see,1 again,1 ,,1 my,4 blue,1 you,1 once,1 with,1 I'm,2 laughing,1 my,5 friends,1 The,2 songs,2 sang,2 we,2 together,2 with,2 I'm,3 laughing,2 my,6 friends,2 green,1 Where,2 the,3 to,4 fields,1 see,2 again,2 you,2 ,,2 my,7 once,2 are,3 love,6 I'm,4 laughing,3 my,8 friends,3 with,3 I,5 wonder,4 how,2 A,1 hole,1 heart,1 inside,1 my,9 together,3 songs,3 sang,3 The,3 we,3 So,1 a,3 prayer,1 i,2 little,1 say,1 I'm,5 all,1 alone,1 And,4 hope,2 my,10 me,2 dreams,2 take,2 will,2 there,2 ------ print counts for my love------ , : 2 A : 1 And : 4 I : 5 I'm : 5 Oh : 2 Over : 1 Reaching : 1 So : 1 The : 3 To : 1 Where : 2 a : 3 again : 2 all : 1 alone : 1 and : 1 are : 3 blue : 1 coast : 2 dreams : 2 far : 1 fields : 1 find : 1 for : 1 forever : 1 friends : 3 green : 1 heart : 1 holding : 1 hole : 1 hope : 2 how : 2 i : 2 inside : 1 laughing : 3 little : 1 love : 6 me : 2 most : 1 my : 10 no : 1 oh : 2 on : 1 once : 2 place : 1 prayer : 1 read : 1 sang : 3 say : 1 seas : 1 see : 2 seems : 1 skies : 1 so : 1 songs : 3 take : 2 that : 1 the : 3 there : 2 they : 1 to : 4 together : 3 try : 1 we : 3 where : 1 why : 1 will : 2 with : 3 wonder : 4 yeah : 1 you : 2
通过对Wordcount执行流程的分析,相信大家对storm 数据处理流程有了进一步理解。
Storm WordCount Topology 学习.pdf 资源大小: 1MB
看完这篇文章的人大多学习了更多课程>>
