2016年3月22日星期二

Java中一个线程可以多次start吗?

不可以
通过Thread实例的start(),一个Thread的实例只能产生一个线程。一个Thread的实例一旦调用start()方法,这个实例的started标记就标记为true,事实中不管这个线程后来有没有执行到底,只要调用了一次start()就再也没有机会运行了。
一个线程对象只能调用一次start方法.从new到等待运行是单行道,所以如果你对一个已经启动的线程对象再调用一次start方法的话,会产生:IllegalThreadStateException异常. 可以被重复调用的是run()方法。
Thread类中run()和start()方法的区别如下:
run()方法: 在本线程内调用该Runnable对象的run()方法,可以重复多次调用;
start()方法: 启动一个线程,调用该Runnable对象的run()方法,不能多次启动一个线程;

2016年2月22日星期一

开发安全指南:如何安全地储存用户密码

开发安全指南:如何安全地储存用户密码

开发安全指南:如何安全地储存用户密码

不要存储明文密码,而是存储密码的哈希值。

JAVA

  1. JAVA第一种方案
    在Java程序中进行安全的密码哈希,除了 libsodium还有jBCrypt,能够提供bcrypt密码哈希算法。
1.String hash = BCrypt.hashpw(userProvidedPassword, BCrypt.gensalt());
2.// 验证一个在Java中的bcrypt hash:
3.if (BCrypt.checkpw(userProvidedPassword, hash)) {
4. // Login successful.
5.}
  1. JAVA第二种方案
    有一个java实现的scrypt,但它需要你指定参数,而不会提供一个默认值给你。
1.#Calculating a hash
2.int N = 16384;int r = 8;int p = 1;
3.String hashed = SCryptUtil.scrypt(passwd, N, r, p);
4.# Validating a hash
5.if (SCryptUtil.check(passwd, hashed))
{
6. // Login successful
7.}

Python

  1. Python 第一种方案
    使用bcrypt python 包 (GitHub)
1.import bcryptimport hmac
2.# Calculating a hash
3.password = b"correct horse battery staple"
4.hashed = bcrypt.hashpw(password, bcrypt.gensalt())
5.# Validating a hash (don't use ==)
6.if(hmac.compare_digest(bcrypt.hashpw(password, hashed), hashed)):
7. # Login successful

Python开发人员通常更喜欢passlib (Bitbucket),尽管它的API命名并不正确。(”加密” 而不是 “hash”):

1.from passlib.hash import bcrypt
2.# Calculating a hash
3.hash = bcrypt.encrypt(usersPassword, rounds=12)
4.# Validating a hash
5.if bcrypt.verify(usersPassword, hash):
6. # Login successful
  1. Python 第二种方案
    目前我们发现除了libsodium以外只有django-scrypt package.可以较为健全的实现。其他的我们还在寻找中。

2016年2月2日星期二

分页查询,你真的懂吗?

分页查询,你真的懂吗?
转自: http://www.cnblogs.com/baochuan/p/4625262.html

程序员代码的编写能力主要体现在思维的严谨上。有些看起来很简单的东西,里面包含很多很细的点,你能想到吗?
今天我就简单说一下一个例子,让大家学习到新知识的同时,也养成一种思维的习惯。

问题

有一张收藏表,里面存储的是用户和图书ID。数据量为1亿。现在要求分页获取所有用户ID(不重复),写下你的sql语句。
表结构大致如下:

1.CREATE TABLE 收藏表(
2. `id` bigint(20) unsigned NOT NULL auto_increment COMMENT 'primary key',
3. `uid` bigint(20) unsigned NOT NULL default 0 COMMENT 'uid',<br>   `status` tinyint(3) unsigned NOT NULL default 0 COMMENT 'status',
4. `book_id` bigint(20) unsigned NOT NULL default 0 COMMENT 'book Id',
5. `create_time` int(11) unsigned not null default 0 COMMENT 'create time',
6. PRIMARY KEY (`id`),
7. UNIQUE KEY `uid_book_id` (`uid`, `book_id`),<br>    KEY `uid_status` (`uid`, `status`)
8.)ENGINED=Innodb Auto_increment=1 default charset=gbk COMMENT '用户收藏信息';

三种设计

最容易想到的第一种分页语句是(这也是我们最容易想到的语句):

1.select distinct uid from 收藏表 order by uid desc limit 0, 10;
2.select distinct uid from 收藏表 order by uid desc limit 11, 10;

再高级点语句,第二种($last_min_uid表示上一次读到的最后一个uid):

1.select distinct uid from 收藏表 order by uid desc limit 10;
2.select distinct uid from 收藏表 where uid < $last_min_uid order by uid desc limit 10;

最高级的方式:

1.select uid from 收藏表 group by uid order by uid desc limit 10;
2.select uid from 收藏表 group by uid having uid < $last_min_uid order by uid desc limit 10;

分析

以上三种方式都可以实现分页获取到用户ID列表,那么区别是什么?我现在就把每一种跟大家分析下。

  • 第一种在业务场景中,会出现丢数据的情况。——这是比较严重的情况,不予采纳。

具体的业务场景是这样的:当你读取第5页的时候,前四页的用户id列表中,假如有一页的用户ID从库中删除掉,那么你这时读到的第5页(limit 51, 10),就是原来的第6页,你会把1页的用户ID丢失掉。

  • 第二种的第二条语句,通过explain分析,实际并没有命中唯一索引,而只是命中了一般索引,数据查询范围在7百万级别,故explain建议我们使用group by。——这个查询会有严重的性能问题。
1.+----+--------+-------+-------+---------+---------+----------+-------+--------+--------+
2.| id | select_type | table| type | possible_keys | key | key_len | ref | rows | Extra |
3.+----+--------+-------+-------+---------+---------+----------+-------+--------+--------+
4.| 1 | SIMPLE | ubook_room | range | uid_book_id | uid_status | 4 | NULL | 7066423 | Using where; Using index for group-by; Using temporary; Using filesort |
5.+----+--------+-------+-------+---------+---------+----------+-------+--------+--------+
  • 第三种explain分析,数据查询范围在12万级别(跟第二种相差一个数量级),查询性能高。
1.+----+--------+--------+-------+------------+--------+---------+-------+----------+--------+
2.| id| select_type| table | type | possible_keys| key | key_len | ref | rows | Extra |
3.+----+--------+--------+-------+------------+--------+---------+-------+----------+--------+
4.| 1 | SIMPLE | 收藏表 | index| NULL | uid_book_id | 12 | NULL | 121719 | Using index |
5.+----+--------+--------+-------+------------+--------+---------+-------+----------+--------+

2016年1月28日星期四

Hadoop Fair Scheduler

Hadoop Fair Scheduler

Hadoop Fair Scheduler

公平调度器(Fair Scheduler)是一个用于Hadoop的插件式的Map/Reduce调度器,它提供了一种共享大规模集群的方法。

引言

  公平调度是一种赋予作业(job)资源的方法,它的目的是让所有的作业随着时间的推移,都能平均的获取等同的共享资源。当单独一个作业在运行时,它将使用整个集群。当有其它作业被提交上来时,系统会将任务(task)空闲时间片(slot)赋给这些新的作业,以使得每一个作业都大概获取到等量的CPU时间。与Hadoop默认调度器维护一个作业队列不同,这个特性让小作业在合理的时间内完成的同时又不“饿”到消耗较长时间的大作业。它也是一个在多用户间共享集群的简单方法。公平共享可以和作业优先权搭配使用——优先权像权重一样用作为决定每个作业所能获取的整体计算时间的比例。

  公平调度器按资源池(pool)来组织作业,并把资源公平的分到这些资源池里。默认情况下,每一个用户拥有一个独立的资源池,以使每个用户都能获得一份等同的集群资源而不管他们提交了多少作业。按用户的Unix群组或作业配置(jobconf)属性来设置作业的资源池也是可以的。在每一个资源池内,会使用公平共享(fairsharing)的方法在运行作业之间共享容量(capacity)。你也可以给予资源池相应的权重,以不按比例的方式共享集群。

除了提供公平共享方法外,公平调度器允许赋给资源池保证(guaranteed)最小共享资源,这个用在确保特定用户、群组或生产应用程序总能获取到足够的资源时是很有用的。当一个资源池包含作业时,它至少能获取到它的最小共享资源,但是当资源池不完全需要它所拥有的保证共享资源时,额外的部分会在其它资源池间进行切分。

  在常规操作中,当提交了一个新作业时,公平调度器会等待已运行作业中的任务完成以释放时间片给新的作业。但,公平调度器也支持在可配置的超时时间后对运行中的作业进行抢占。如果新的作业在一定时间内还获取不到最小的共享资源,这个作业被允许去终结已运行作业中的任务以获取运行所需要的资源。因此抢占可以用来保证“生产”作业在指定时间内运行的同时也让Hadoop集群能被实验或研究作业使用。另外,作业的资源在可配置的超时时间(一般设置大于最小共享资源超时时间)内拥有不到其公平共享资源(fair share)的一半的时候也允许对任务进行抢占。在选择需要结束的任务时,公平调度器会在所有作业中选择那些最近运行起来的任务,以最小化被浪费的计算。抢占不会导致被抢占的作业失败,因为Hadoop作业能容忍丢失任务,这只是会让它们的运行时间更长。

  最后,公平调度器还可以限制每用户和每资源池的并发运行作业数量。当一个用户必须一次性提交数百个作业时,或当大量作业并发执行时,用来确保中间数据不会塞满集群上的磁盘空间,这是很有用的。设置作业限制会使超出限制的作业被列入调度器的队列中进行等待,直到一些用户/资源池的早期作业运行完毕。系统会根据作业优先权和提交时间的排列来运行每个用户/资源池中的作业。

理念

公平调度器的设计主要基于以下几个考虑:

  • 为Hadoop集群里的“小作业”,即所需执行时间较少的作业,提供一个“公平”的资源竞争机制,使它们即使在需要和一些规模较大的作业共享资源的时候也能够较快地得到执行。与之相对,在Hadoop默认的先进先出调度机制下则有可能出现大作业占据资源而使小作业长时间等待的局面。
  • 确保某些特定的用户或者生产环境(Production)应用程序任何时间都可以得到足够的资源,这样即使它们和其它的测试或实验程序共享集群资源,也能够随时运行。
  • 易于管理和配置。
  • 支持运行时配置,不需要重启集群。

主要机能

资源池(Pool)

资源池是实现公平调度的一个重要的机能。公平调度器会把作业划分到不同的资源池里,原则上,资源池之间的资源实现共享,每个资源池内部的作业也实现资源共享。更具体的来说,资源池具有以下特性:

  • 资源公平地划分到每个资源池。
  • 每个资源池内,可以使用公平共享的方法来调度作业,也可以按照先进先出的方法来调度。不同的资源池可以配置不同的调度方式。
  • 默认情况下,每个用户都分到相同量的集群资源,同一用户提交的作业共享资源。但也可以通过设置jobconf属性来指定作业进入哪个资源池。
  • 没有被标示的作业将会进入一个默认的资源池
  • 可以对资源池设置权重。例如,权重为2的资源池可以获得2倍于权重为1的资源池所享有的资源。

最小共享量(MinimumShare)

通常情况下,资源池会分配到相同份额的map/reduceslot。不过,通过给指定的资源池设置最小(map/reduceslot)共享量,即使在该资源池理论上应分配到的公平共享量少于最小共享量的时候,也可以确保这个资源池里的作业始终获得足够的资源来运行。最小共享量有以下特性: – 资源池的公平共享量永远不会小于最小共享量。

  • 如果有资源池的当前共享量小于最小共享量,它会优先得到下一个可用的slot。
  • 可以对资源池设置一个抢占超时(通常建议设置一个较高的数值,比如10分钟)。当超过一定时间资源池仍未能获得足够的slot以满足最小共享量的时候,允许调度器停止(kill)其它作业的任务以释放额外的共享量。
  • 当资源池里没有活动的作业的时候,最小共享量不被保留,将会被分给其它的资源池。

作业优先级

作业优先级用于帮助进行资源池内的作业调度。

  • 在先进先出的资源池内,作业的顺序首先按优先级,然后按提交的时间来确定。
  • 在公平共享的资源池内,作业优先级作为权重来控制作业应获得的共享量。普通优先级对应权重为1.0,每高一个优先级权重就乘以2。

调度算法

调度算法的基本规则是:当有slot空闲下来的时候,把它分配给最需要的资源池,这样可以确保所有的资源池都得到相同数量的slot。但是,如果一个资源池的需求低于它的公平共享量,那么多余的slot会被平均分给其它的资源池。另外,在此基础上还有一些补充规则,比如:

  • 如前所述,资源池的权重将会影响到它能分配到的资源。
  • 获得资源低于最小共享量的资源池可以优先得到空闲的slot。资源低于最小共享量的优先于资源高于最小共享量。对于同为低于最小共享量的情况,比较两者的缺额百分比(距离最小共享量的缺额/最小共享量),缺额百分比更大的优先。

Binary Search Tree

BST

BST

完全二叉树(Complete Binary Tree)

若设二叉树的深度为h,除第 h 层外,其它各层 (1~h-1) 的结点数都达到最大个数,第 h 层所有的结点都连续集中在最左边,这就是完全二叉树。
完全二叉树是由满二叉树而引出来的。对于深度为K的,有N个结点的二叉树,当且仅当其每一个结点都与深度为K的满二叉树中编号从1至n的结点一一对应时称之为完全二叉树。
若一棵二叉树至多只有最下面的两层上的结点的度数可以小于2,并且最下层上的结点都集中在该层最左边的若干位置上,则此二叉树成为完全二叉树

Binary Search Tree

二叉查找树(Binary Search Tree),也称有序二叉树(ordered binary tree),排序二叉树(sorted binary tree),是指一棵空树或者具有下列性质的二叉树:

  1. 若任意节点的左子树不空,则左子树上所有结点的值均小于它的根结点的值;
  2. 任意节点的右子树不空,则右子树上所有结点的值均大于它的根结点的值;
  3. 任意节点的左、右子树也分别为二叉查找树。
  4. 没有键值相等的节点(no duplicate nodes)。

二叉查找树相比于其他数据结构的优势在于查找、插入的时间复杂度较低。为O(log n)。二叉查找树是基础性数据结构,用于构建更为抽象的数据结构,如集合、multiset、关联数组等。

二叉查找树的查找过程和次优二叉树类似,通常采取二叉链表作为二叉查找树的存储结构。中序遍历二叉查找树可得到一个关键字的有序序列,一个无序序列可以通过构造一棵二叉查找树变成一个有序序列,构造树的过程即为对无序序列进行查找的过程。每次插入的新的结点都是二叉查找树上新的叶子结点,在进行插入操作时,不必移动其它结点,只需改动某个结点的指针,由空变为非空即可。搜索,插入,删除的复杂度等于树高,期望O(log n),最坏O(n)(数列有序,树退化成线性表).

虽然二叉查找树的最坏效率是O(n),但它支持动态查询,且有很多改进版的二叉查找树可以使树高为O(logn),如SBT,AVL,红黑树等.故不失为一种好的动态查找方法.

二叉查找树的查找算法

  • 在二叉查找树b中查找x的过程为:

    1. 若b是空树,则搜索失败,否则:
    2. 若x等于b的根节点的数据域之值,则查找成功;否则:
    3. 若x小于b的根节点的数据域之值,则搜索左子树;否则:
    4. 查找右子树。
  • 在二叉查找树插入节点的算法
    向一个二叉查找树b中插入一个节点s的算法,过程为:

    1. 若b是空树,则将s所指结点作为根节点插入,否则:
    2. 若s->data等于b的根节点的数据域之值,则返回,否则:
    3. 若s->data小于b的根节点的数据域之值,则把s所指节点插入到左子树中,否则:
    4. 把s所指节点插入到右子树中。
  • 在二叉查找树删除结点的算法
    在二叉查找树删去一个结点,分三种情况讨论:

    1. 若*p结点为叶子结点,即PL(左子树)和PR(右子树)均为空树。由于删去叶子结点不破坏整棵树的结构,则只需修改其双亲结点的指针即可。
    2. 若*p结点只有左子树PL或右子树PR,此时只要令PL或PR直接成为其双亲结点*f的左子树(当*p是左子树)或右子树(当*p是右子树)即可,作此修改也不破坏二叉查找树的特性。
    3. 若*p结点的左子树和右子树均不空。在删去*p之后,为保持其它元素之间的相对位置不变,可按中序遍历保持有序进行调整,可以有两种做法:其一是令*p的左子树为*f的左/右(依*p是*f的左子树还是右子树而定)子树,*s为*p左子树的最右下的结点,而*p的右子树为*s的右子树;其二是令*p的直接前驱(in-order predecessor)或直接后继(in-order successor)替代*p,然后再从二叉查找树中删去它的直接前驱(或直接后继)。
      enter image description here

.n个节点可以构成的不同形状的二叉树的数目有
递推式:

s[0]=1;
s[1]=1;

s[i]=sum(s[j]*s[i-j-1]) 0<=j<=i-1;

这是一个catalan数列

MapReduce:详解Shuffle过程

MapReduce:详解Shuffle过程

MapReduce:详解Shuffle过程

转自:http://langyu.iteye.com/blog/992916

Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方。要想理解MapReduce, Shuffle是必须要了解的。
Shuffle的正常意思是洗牌或弄乱,比如Java API里的Collections.shuffle(List)方法,它会随机地打乱参数list里的元素顺序。如果你不知道MapReduce里Shuffle是什么,那么请看这张图:
enter image description here
Shuffle的大致范围就成-怎样把map task的输出结果有效地传送到reduce端。也可以这样理解, Shuffle描述着数据从map task输出到reduce task输入的这段过程。

在Hadoop这样的集群环境中,大部分map task与reduce task的执行是在不同的节点上。当然很多情况下Reduce执行时需要跨节点去拉取其它节点上的map task结果。如果集群正在运行的job有很多,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的就是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘IO对job完成时间的影响也是可观的。从最基本的要求来说,我们对Shuffle过程的期望可以有:

  • 完整地从map task端拉取数据到reduce 端。
  • 在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
  • 减少磁盘IO对task执行的影响。

如果是自己来设计这段Shuffle过程,那么你的设计目标是什么。我想能优化的地方主要在于减少拉取数据的量及尽量使用内存而不是磁盘。
从上图看出,Shuffle过程横跨map与reduce两端,所以下面我也会分两部分来展开。

先看看map端的情况,如下图:
enter image description here
上图可能是某个map task的运行情况。拿它与官方图的左半边比较,会发现很多不一致。官方图没有清楚地说明partition, sort与combiner到底作用在哪个阶段。我画了这张图,希望让大家清晰地了解从map数据输入到map端所有数据准备好的全过程。

整个流程我分了四步。简单些可以这样说,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据

这里的每一步都可能包含着多个步骤与细节,下面对细节来一一说明:

  1. 在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。Split与block的对应关系可能是多对一,默认是一对一。在WordCount例子里,假设map的输入数据都是像“aaa”这样的字符串。

  2. 在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。前面我们知道这个job有3个reduce task,到底当前的“aaa”应该交由哪个reduce去做呢,是需要现在决定的。

    MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

    在我们的例子中,“aaa”经过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。整个内存缓冲区就是一个字节数组。

  3. 内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写,字面意思很直观。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
    当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。
    因为map task的输出是需要发送到不同的reduce端去,而内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种合并应该是体现是磁盘文件中的。从官方图上也可以看到写到磁盘中的溢写文件是对不同的reduce端的数值做过合并。所以溢写过程一个很重要的细节在于,如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。

在针对每个reduce端而合并数据时,有些数据可能像这样:“aaa”/1, “aaa”/1。对于WordCount例子,就是简单地统计单词出现的次数,如果在同一个map task的结果中有很多个像“aaa”一样出现多次的key,我们就应该把它们的值合并到一块,这个过程叫reduce也叫combine。但MapReduce的术语中,reduce只指reduce端执行从多个map task取数据做计算的过程。除reduce外,非正式地合并数据只能算做combine了。其实大家知道的,MapReduce中将Combiner等同于Reducer。

如果client设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

  1. 每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。Merge是怎样的?如前面的例子,“aaa”从某个map task读取过来时值是5,从另外一个map 读取时值是8,因为它们有相同的key,所以得merge成group。什么是group。对于“aaa”就是像这样的:{“aaa”, [5, 8, 2, …]},数组中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。请注意,因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果client设置过Combiner,也会使用Combiner来合并相同的key。

至此,map端的所有工作都已结束,最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。

简单地说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。见下图:
enter image description here
如map 端的细节图,Shuffle在reduce端的过程也能用图上标明的三点来概括。当前reduce copy数据的前提是它要从JobTracker获得有哪些map task已执行结束,这段过程不表,有兴趣的朋友可以关注下。Reducer真正运行之前,所有的时间都是在拉取数据,做merge,且不断重复地在做。如前面的方式一样,下面我也分段地描述reduce 端的Shuffle细节:

  1. Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。

  2. Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。这里需要强调的是,merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。

  3. Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。至于怎样才能让这个文件出现在内存中,之后的性能优化篇我再说。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。