Mapreduce默認(rèn)的partitioner是HashPartitioner。除了這個(gè)mapreduce還提供了3種partitioner。如下圖所示:
是partitioner的基類,如果需要定制partitioner也需要繼承該類。2. HashPartitioner是mapreduce的默認(rèn)partitioner。計(jì)算方法是
which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到當(dāng)前的目的reducer。
3. BinaryPatitioner繼承于Partitioner< BinaryComparable ,V>,是Partitioner的偏特化子類。該類提供leftOffset和rightOffset,在計(jì)算which reducer時(shí)僅對鍵值K的[rightOffset,leftOffset]這個(gè)區(qū)間取hash。
Which reducer=(hash & Integer.MAX_VALUE) % numReduceTasks
4. KeyFieldBasedPartitioner也是基于hash的個(gè)partitioner。和BinaryPatitioner不同,它提供了多個(gè)區(qū)間用于計(jì)算hash。當(dāng)區(qū)間數(shù)為0時(shí)KeyFieldBasedPartitioner退化成HashPartitioner。
5. TotalOrderPartitioner這個(gè)類可以實(shí)現(xiàn)輸出的全排序。不同于以上3個(gè)partitioner,這個(gè)類并不是基于hash的。在下一節(jié)里詳細(xì)的介紹totalorderpartitioner。
TotalOrderPartitioner
每一個(gè)reducer的輸出在默認(rèn)的情況下都是有順序的,但是reducer之間在輸入是無序的情況下也是無序的。如果要實(shí)現(xiàn)輸出是全排序的那就會用到TotalOrderPartitioner。
要使用TotalOrderPartitioner,得給TotalOrderPartitioner提供一個(gè)partition file。這個(gè)文件要求Key (這些key就是所謂的劃分)的數(shù)量和當(dāng)前reducer的數(shù)量-1相同并且是從小到大排列。對于為什么要用到這樣一個(gè)文件,以及這個(gè)文件的具體細(xì)節(jié)待會還會提到。
TotalOrderPartitioner對不同Key的數(shù)據(jù)類型提供了兩種方案:
1) 對于非BinaryComparable(參考附錄A)類型的Key,TotalOrderPartitioner采用二分發(fā)查找當(dāng)前的K所在的index。
例如reducer的數(shù)量為5,partition file 提供的4個(gè)劃分為【2,4,6,8】。如果當(dāng)前的一個(gè)key value pair 是<4,”good”>利用二分法查找到index=1,index+1=2那么這個(gè)key value pair將會發(fā)送到第二個(gè)reducer。如果一個(gè)key value pair為<4.5, “good”>那么二分法查找將返回-3,同樣對-3加1然后取反就是這個(gè)key value pair 將要去的reducer。
對于一些數(shù)值型的數(shù)據(jù)來說,利用二分法查找復(fù)雜度是o(log (reducer count)),速度比較快。
2) 對于BinaryComparable類型的Key(也可以直接理解為字符串)。字符串按照字典順序也是可以進(jìn)行排序的。這樣的話也可以給定一些劃分,讓不同的字符串key分配到不同的reducer里。這里的處理和數(shù)值類型的比較相近。
例如reducer的數(shù)量為5,partition file 提供了4個(gè)劃分為【“abc”, “bce”, “eaa”, ”fhc”】那么“ab”這個(gè)字符串將會被分配到第一個(gè)reducer里,因?yàn)樗∮诘谝粋€(gè)劃分“abc”。
但是不同于數(shù)值型的數(shù)據(jù),字符串的查找和比較不能按照數(shù)值型數(shù)據(jù)的比較方法。mapreducer采用的Tire tree的字符串查找方法。查找的時(shí)間復(fù)雜度o(m),m為樹的深度,空間復(fù)雜度o(255^m-1)。是一個(gè)典型的空間換時(shí)間的案例。
Tire Tree
Tire tree的構(gòu)建
假設(shè)樹的最大深度為3,劃分為【aaad ,aaaf, aaaeh,abbx 】

tairtree結(jié)構(gòu)
Mapreduce里的Tire tree主要有兩種節(jié)點(diǎn)組成:
1) Innertirenode
Innertirenode在mapreduce中是包含了255個(gè)字符的一個(gè)比較長的串。上圖中的例子只包含了26個(gè)英文字母。
2) 葉子節(jié)點(diǎn){unslipttirenode, singesplittirenode, leaftirenode}
Unslipttirenode 是不包含劃分的葉子節(jié)點(diǎn)。
Singlesplittirenode 是只包含了一個(gè)劃分點(diǎn)的葉子節(jié)點(diǎn)。
Leafnode是包含了多個(gè)劃分點(diǎn)的葉子節(jié)點(diǎn)。(這種情況比較少見,達(dá)到樹的最大深度才出現(xiàn)這種情況。在實(shí)際操作過程中比較少見)Tire tree的搜索過程
接上面的例子:
1)假如當(dāng)前 key value pair 這時(shí)會找到圖中的leafnode,在leafnode內(nèi)部使用二分法繼續(xù)查找找到返回 aad在 劃分?jǐn)?shù)組中的索引。找不到會返回一個(gè)和它最接近的劃分的索引。
2)假如找到singlenode,如果和singlenode的劃分相同或小返回他的索引,比singlenode的劃分大則返回索引+1。
3)假如找到nosplitnode則返回前面的索引。如將會返回abbx的在劃分?jǐn)?shù)組中的索引。
TotalOrderPartitioner的疑問
上面介紹了partitioner有兩個(gè)要求,一個(gè)是速度另外一個(gè)是均衡負(fù)載。使用tire tree提高了搜素的速度,但是我們怎么才能找到這樣的partition file 呢?讓所有的劃分剛好就能實(shí)現(xiàn)均衡負(fù)載。
InputSampler
輸入采樣類,可以對輸入目錄下的數(shù)據(jù)進(jìn)行采樣。提供了3種采樣方法。

采樣類結(jié)構(gòu)圖
采樣方式對比表:
類名稱 | 采樣方式 | 構(gòu)造方法 | 效率 | 特點(diǎn) |
SplitSampler<K,V> | 對前n個(gè)記錄進(jìn)行采樣 | 采樣總數(shù),劃分?jǐn)?shù) | 最高 | |
RandomSampler<K,V> | 遍歷所有數(shù)據(jù),隨機(jī)采樣 | 采樣頻率,采樣總數(shù),劃分?jǐn)?shù) | 最低 | |
IntervalSampler<K,V> | 固定間隔采樣 | 采樣頻率,劃分?jǐn)?shù) | 中 | 對有序的數(shù)據(jù)十分適用 |
writePartitionFile這個(gè)方法很關(guān)鍵,這個(gè)方法就是根據(jù)采樣類提供的樣本,首先進(jìn)行排序,然后選定(隨機(jī)的方法)和reducer數(shù)目-1的樣本寫入到partition file。這樣經(jīng)過采樣的數(shù)據(jù)生成的劃分,在每個(gè)劃分區(qū)間里的key value pair 就近似相同了,這樣就能完成均衡負(fù)載的作用。
TotalOrderPartitioner實(shí)例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
implements Tool
{
@Override
public int run(String[] args) throws Exception
{
JobConf conf = JobBuilder.parseInputAndOutput( this , getConf(), args);
if (conf == null ) {
return - 1 ;
}
conf.setInputFormat(SequenceFileInputFormat. class );
conf.setOutputKeyClass(IntWritable. class );
conf.setOutputFormat(SequenceFileOutputFormat. class );
SequenceFileOutputFormat.setCompressOutput(conf, true );
SequenceFileOutputFormat
.setOutputCompressorClass(conf, GzipCodec. class );
SequenceFileOutputFormat.setOutputCompressionType(conf,
CompressionType.BLOCK);
conf.setPartitionerClass(TotalOrderPartitioner. class );
InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
0.1 , 10000 , 10 );
Path input = FileInputFormat.getInputPaths(conf)[ 0 ];
input = input.makeQualified(input.getFileSystem(conf));
Path partitionFile = new Path(input, "_partitions" );
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
InputSampler.writePartitionFile(conf, sampler);
// Add to DistributedCache
URI partitionUri = new URI(partitionFile.toString() + "#_partitions" );
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
JobClient.runJob(conf);
return 0 ;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new SortByTemperatureUsingTotalOrderPartitioner(), args);
System.exit(exitCode);
}
}
|
示例程序引用于:http://www.cnblogs.com/funnydavid/archive/2010/11/24/1886974.html
附錄A
Text 為BinaryComparable,WriteableComparable類型。
BooleanWritable、ByteWritable、DoubleWritable、MD5hash、IntWritable、FloatWritable、LongWritable、NullWriable等都為WriteableComparable。具體參考下圖:

附錄