Hadoop从入门到精通33:MapReduce核心原理之Shuffle过程分析

在安装Hadoop集群的时候,我们在yarn-site.xml文件中配置了MapReduce的运行方式为yarn.nodemanager.aux-services=mapreduce_shuffle。本节就来详细介绍一下MapReduce的shuffle过程。

shuffle,即混洗、洗牌的意思,是指MapReduce程序在执行过程中,数据在各个Mapper(Combiner、Sorter、Partitioner)、Reducer等进程之间互相交换的过程。

关于上图Shuffle过程的几点说明:

说明:map节点执行map task任务生成map的输出结果。

shuffle的工作内容:

从运算效率的出发点,map输出结果优先存储在map节点的内存中。每个map task都有一个内存缓冲区,存储着map的输出结果,当达到内存缓冲区的阀值(80%)时,需要将缓冲区中的数据以一个临时文件的方式存到磁盘,当整个map task结束后再对磁盘中这个map task所产生的所有临时文件做合并,生成最终的输出文件。最后,等待reduce task来拉取数据。当然,如果map task的结果不大,能够完全存储到内存缓冲区,且未达到内存缓冲区的阀值,那么就不会有写临时文件到磁盘的操作,也不会有后面的合并。

详细过程如下:

(1)map task任务执行,输入数据的来源是:HDFS的block。当然在mapreduce概念中,map task读取的是split分片。split与block的对应关系:一对一(默认)。

此处有必要说明一下block与split:

block(物理划分):文件上传到HDFS,就要划分数据成块,这里的划分属于物理的划分,块的大小可配置(默认:第一代为64M,第二代为128M)可通过 dfs.block.size配置。为保证数据的安 全,block采用冗余机制:默认为3份,可通过dfs.replication配置。注意:当更改块大小的配置后,新上传的文件的块大小为新配置的值,以前上传的文件的块大小为以前的配置值。

split(逻辑划分):Hadoop中split划分属于逻辑上的划分,目的只是为了让map task更好地获取数据。split是通过hadoop中的InputFormat接口中的getSplit()方法得到的。那么,split的大小具体怎么得到呢?

首先介绍几个数据量:

totalSize:整个mapreduce job所有输入的总大小。注意:基本单位是block个数,而不是Bytes个数。

numSplits:来自job.getNumMapTasks(),即在job启动时用户利用 org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)设置的值,从方法的名称上看,是用于设置map的个数。但是,最终map的个数也就是split的个数并不一定取用户设置的这个值,用户设置的map个数值只是给最终的map个数一个提示,只是一个影响因素,而不是决定因素。

goalSize:totalSize/numSplits,即期望的split的大小,也就是每个mapper处理多少的数据。但是仅仅是期望

minSize:split的最小值,该值可由两个途径设置:

最终取goalSize和minSize中的最大值!

最终:split大小的计算原则:finalSplitSize=max(minSize,min(goalSize,blockSize))

那么,map的个数=totalSize/finalSplitSize

注意: 新版的API中InputSplit划分算法不再考虑用户设定的Map Task个数,而是用mapred.max.split.size(记为maxSize)代替

即:InputSplit大小的计算公式为:splitSize=max{minSize,min{maxSize,blockSize}}

接下来就简答说说怎么根据业务需求,调整map的个数。

当我们用hadoop处理大批量的大数据时,一种最常见的情况就是job启动的mapper数量太多而超出系统限制,导致hadoop抛出异常终止执行。

解决方案:减少mapper的数量!具体如下:

a.输入文件数量巨大,但不是小文件

这种情况可通过增大每个mapper的inputsize,即增大minSize或者增大blockSize来减少所需的mapper的数量。增大blocksize通常不可行,因为HDFS被hadoop namenode -format之后,blocksize就已经确定了(由格式化时dfs.block.size决定),如果要更改blocksize,需要重新格式化HDFS,这样当然会丢失已有的数据。所以通常情况下只能增大minSize,即增大mapred.min.split.size的值。

b.输入文件数量巨大,且都是小文件

所谓小文件,就是单个文件的size小于blockSize。这种情况通过增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减少mapper的数量。增加mapper的数量,可以通过减少每个mapper的输入做到,即减小blockSize或者减少mapred.min.split.size的值。

(2)map执行后,得到key/value键值对。接下来的问题就是,这些键值对应该交给哪个reduce做?注意:reduce的个数是允许用户在提交job时,通过设置方法设置的!

MapReduce提供partitioner接口解决上述问题。默认操作是:对key hash后再以reduce task数量取模,返回值决定着该键值对应该由哪个reduce处理。这种默认的取模方式只是为了平均reduce的处理能力,防止数据倾斜,保证负载均衡。如果用户自己对Partition有需求,可以自行定制并设置到job上。

接下来,需要将key/value以及Partition结果都写入到缓冲区,缓冲区的作用:批量收集map结果,减少磁盘IO的影响。当然,写入之前,这些数据都会被序列化成字节数组。而整个内存缓冲区就是一个字节数组。这个内存缓冲区是有大小限制的,默认100MB。当map task的输出结果很多时,就可能撑爆内存。需将缓冲区的数据临时写入磁盘,然后重新利用这块缓冲区。

从内存往磁盘写数据被称为Spill(溢写),由单独线程完成,不影响往缓冲区写map结果的线程。溢写比例:spill.percent(默认0.8)。

当缓冲区的数据达到阀值,溢写线程启动,锁定这80MB的内存,执行溢写过程。剩下的20MB继续写入map task的输出结果。互不干涉!

当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是mapreduce模型的默认行为,也是对序列化的字节做的排序。排序规则:字典排序!

map task的输出结果写入内存后,当溢写线程未启动时,对输出结果并没有做任何的合并。从官方图可以看出,合并是体现在溢写的临时磁盘文件上的,且这种合并是对不同的reduce端的数值做的合并。所以溢写过程一个很重要的细节在于,如果有很多个key/value对需要发送到某个reduce端,那么需要将这些键值对拼接到一块,减少与partition相关的索引记录。如果client设置Combiner,其会将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。注意:这里的合并并不能保证map结果中所有的相同的key值的键值对的value都合并了,它合并的范围只是这80MB,它能保证的是在每个单独的溢写文件中所有键值对的key值均不相同!

溢写生成的临时文件的个数随着map输出结果的数据量变大而增多,当整个map task完成,内存中的数据也全部溢写到磁盘的一个溢写文件。也就是说,不论任何情况下,溢写过程生成的溢写文件至少有一个!但是最终的文件只能有一个,需要将这些溢写文件归并到一起,称为merge。merge是将所有的溢写文件归并到一个文件,结合上面所描述的combiner的作用范围,归并得到的文件内键值对有可能拥有相同的key,这个过程如果client设置过Combiner,也会合并相同的key值的键值对,如果没有,merge得到的就是键值集合,如{“aaa”, [5, 8, 2, …]}。注意:combiner的合理设置可以提高效率,但是如果使用不当会影响效率!

至此,map端的所有工作都已经结束!

当mapreduce任务提交后,reduce task就不断通过RPC从JobTracker那里获取map task是否完成的信息,如果获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程就开始启动。其实呢,reduce task在执行之前的工作就是:不断地拉取当前job里每个map task的最终结果,并对不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。

1.Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fether),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘。

2.Merge过程。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy过来的数值。Copy过来的数据会先放入内存缓冲区中,这里缓冲区的大小要比map端的更为灵活,它是基于JVM的heap size设置,因为shuffler阶段reducer不运行,所以应该把绝大部分的内存都给shuffle用。

merge的三种形式:内存到内存、内存到磁盘、磁盘到磁盘。默认情况下,第一种形式不启用。当内存中的数据量达到一定的阀值,就启动内存到磁盘的merge。与map端类似,这也是溢写过程,当然如果这里设置了Combiner,也是会启动的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。

3.reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。这个最终文件可能在磁盘中也可能在内存中。当然我们希望它在内存中,直接作为reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当reducer的输入文件已定,整个shuffle才最终结束。然后就是reducer执行,把结果存放到HDFS上。



  • java鏁欑▼鍝釜濂
    绛旓細杩欏勾澶,缃戜笂鐨凧ava鏁欑▼涓鍫嗕竴鍫嗙殑,鐪嬬殑鎴戜滑寰堝Java瀹濆疂浠槸鐪艰姳缂贡,涓嶇煡璇ュ浣曟槸濂,鎴戝綋骞翠篃鏄粠杩欎釜杩囩▼璧拌繃鏉ョ殑,姣忓ぉ鐪嬪緢澶氳佸笀鐨凧ava鏁欑▼,鍚殑鑷繁鏅曞ご杞悜鐨,褰撶劧鎴戜笉鏄浠栦滑璁查敊浜,浠栦滑璁茬殑鐭ヨ瘑灞傞潰鐨勪笢瑗挎槸瀵圭殑,浣嗘槸寰堝鍋欽ava鏁欑▼鐨勮佸笀涓嶅杽浜庢繁鍏ユ祬鍑,閫氫織鏄撴噦鐨勬潵涓烘垜浠瑙g煡璇,浠涔堟槸濂芥暀绋?濂芥暀...
  • 鑷浜哄伐鏅鸿兘闇瑕佸閭d簺涓撲笟鐭ヨ瘑
    绛旓細绂绘暎鏁板锛屾暟鍊煎垎鏋愩傞渶瑕佺畻娉曠殑绉疮锛氫汉宸ョ缁忕綉缁滐紝鏀寔鍚戦噺鏈猴紝閬椾紶绠楁硶绛夌瓑绠楁硶;褰撶劧杩樻湁鍚勪釜棰嗗煙闇瑕佺殑绠楁硶锛屾瘮濡傝璁╂満鍣ㄤ汉鑷繁鍦ㄤ綅缃幆澧冨鑸拰寤哄浘灏遍渶瑕佺爺绌禨LAM;鎬讳箣绠楁硶寰堝闇瑕佹椂闂寸殑绉疮銆傞渶瑕佹帉鎻¤嚦灏戜竴闂ㄧ紪绋嬭瑷锛氭瘯绔熺畻娉曠殑瀹炵幇杩樻槸瑕佺紪绋嬬殑;濡傛灉娣卞叆鍒扮‖浠剁殑璇濓紝涓浜涚數绫诲熀纭璇惧繀涓嶅彲灏戙
  • 搴旇鎬庢牱瀛︿範JAVA?椤哄簭鏄粈涔?
    绛旓細- AugularJS - RectJS - LayUI - EasyUI ...绗洓閮ㄥ垎锛欽avaWEB Servlet JSP AJAX(鏄疛avaScript鐨勪竴閮ㄥ垎璇硶,涓撻棬鍋氶〉闈㈠眬閮ㄥ埛鏂)绗簲閮ㄥ垎锛欽avaWEB椤圭洰 鍋氫竴涓狟/S缁撴瀯鐨勯」鐩紝灏哤EB鍓嶇鍜孞avaWEB鍐呭鍋氫竴涓暣鍚堢粌涔犮傚叾瀹炲埌杩欓噷涓烘锛屾墍鏈夌殑绯荤粺閮藉彲浠ュ仛浜嗐備絾鏄敤鐨勬妧鏈緢Low銆傛病鏈夌敤妗嗘灦銆
  • 姹俰tsource璁哄潧浼氬憳璐﹀彿鎴栬呮眰Spark浠庡叆闂ㄥ埌绮鹃鏁欑▼
    绛旓細ApacheSpark鏄笓涓哄ぇ瑙勬ā鏁版嵁澶勭悊鑰岃璁$殑蹇熼氱敤鐨勮绠楀紩鎿庛係park鏄竴绉嶄笌Hadoop鐩镐技鐨勫紑婧愰泦缇よ绠楃幆澧冿紝浣嗘槸涓よ呬箣闂磋繕瀛樺湪涓浜涗笉鍚屼箣澶勶紝杩欎簺鏈夌敤鐨勪笉鍚屼箣澶勪娇Spark鍦ㄦ煇浜涘伐浣滆礋杞芥柟闈㈣〃鐜板緱鏇村姞浼樿秺锛屾崲鍙ヨ瘽璇达紝Spark鍚敤浜嗗唴瀛樺垎甯冩暟鎹
  • 鎬庢牱瀛︿範澶ф暟鎹?
    绛旓細Sqoop锛氳繖涓槸鐢ㄤ簬鎶奙ysql閲岀殑鏁版嵁瀵煎叆鍒Hadoop閲岀殑銆傚綋鐒朵綘涔熷彲浠ヤ笉鐢ㄨ繖涓紝鐩存帴鎶奙ysql鏁版嵁琛ㄥ鍑烘垚鏂囦欢鍐嶆斁鍒癏DFS涓婁篃鏄竴鏍风殑锛屽綋鐒剁敓浜х幆澧冧腑浣跨敤瑕佹敞鎰廙ysql鐨勫帇鍔涖侶ive锛氳繖涓笢瑗垮浜庝細SQL璇硶鐨勬潵璇村氨鏄鍣紝瀹冭兘璁╀綘澶勭悊澶ф暟鎹彉鐨勫緢绠鍗曪紝涓嶄細鍐嶈垂鍔茬殑缂栧啓MapReduce绋嬪簭銆傛湁鐨勪汉璇碢ig閭?瀹冨拰...
  • 扩展阅读:mac蜜桃奶茶314 ... 永久免费windows10云主机 ... www.sony.com.cn ... java入门网站 ... 国产377vc精华真能祛斑吗 ... padi潜水官网 ... javascript 在线 ... javascript入门 ... 永久不收费免费的聊天软件 ...

    本站交流只代表网友个人观点,与本站立场无关
    欢迎反馈与建议,请联系电邮
    2024© 车视网