目的
這篇教程從用戶的角度出發(fā),全面地介紹了Hadoop Map/Reduce框架的各個(gè)方面。
先決條件
請(qǐng)先確認(rèn)Hadoop被正確安裝、配置和正常運(yùn)行中。更多信息見:
Hadoop快速入門對(duì)初次使用者。
Hadoop集群搭建對(duì)大規(guī)模分布式集群。
概述
Hadoop Map/Reduce是一個(gè)使用簡(jiǎn)易的軟件框架,基于它寫出來的應(yīng)用程序能夠運(yùn)行在由上千個(gè)商用機(jī)器組成的大型集群上,并以一種可靠容錯(cuò)的方式并行處理上T級(jí)別的數(shù)據(jù)集。
一個(gè)Map/Reduce 作業(yè)(job) 通常會(huì)把輸入的數(shù)據(jù)集切分為若干獨(dú)立的數(shù)據(jù)塊,由 map任務(wù)(task)以完全并行的方式處理它們。框架會(huì)對(duì)map的輸出先進(jìn)行排序, 然后把結(jié)果輸入給reduce任務(wù)。通常作業(yè)的輸入和輸出都會(huì)被存儲(chǔ)在文件系統(tǒng)中。 整個(gè)框架負(fù)責(zé)任務(wù)的調(diào)度和監(jiān)控,以及重新執(zhí)行已經(jīng)失敗的任務(wù)。
通常,Map/Reduce框架和分布式文件系統(tǒng)是運(yùn)行在一組相同的節(jié)點(diǎn)上的,也就是說,計(jì)算節(jié)點(diǎn)和存儲(chǔ)節(jié)點(diǎn)通常在一起。這種配置允許框架在那些已經(jīng)存好數(shù)據(jù)的節(jié)點(diǎn)上高效地調(diào)度任務(wù),這可以使整個(gè)集群的網(wǎng)絡(luò)帶寬被非常高效地利用。
Map/Reduce框架由一個(gè)單獨(dú)的master JobTracker 和每個(gè)集群節(jié)點(diǎn)一個(gè)slave TaskTracker共同組成。master負(fù)責(zé)調(diào)度構(gòu)成一個(gè)作業(yè)的所有任務(wù),這些任務(wù)分布在不同的slave上,master監(jiān)控它們的執(zhí)行,重新執(zhí)行已經(jīng)失敗的任務(wù)。而slave僅負(fù)責(zé)執(zhí)行由master指派的任務(wù)。
應(yīng)用程序至少應(yīng)該指明輸入/輸出的位置(路徑),并通過實(shí)現(xiàn)合適的接口或抽象類提供map和reduce函數(shù)。再加上其他作業(yè)的參數(shù),就構(gòu)成了作業(yè)配置(job configuration)。然后,Hadoop的 job client提交作業(yè)(jar包/可執(zhí)行程序等)和配置信息給JobTracker,后者負(fù)責(zé)分發(fā)這些軟件和配置信息給slave、調(diào)度任務(wù)并監(jiān)控它們的執(zhí)行,同時(shí)提供狀態(tài)和診斷信息給job-client。
雖然Hadoop框架是用JavaTM實(shí)現(xiàn)的,但Map/Reduce應(yīng)用程序則不一定要用 Java來寫 。
Hadoop Streaming是一種運(yùn)行作業(yè)的實(shí)用工具,它允許用戶創(chuàng)建和運(yùn)行任何可執(zhí)行程序 (例如:Shell工具)來做為mapper和reducer。
Hadoop Pipes是一個(gè)與SWIG兼容的C++ API (沒有基于JNITM技術(shù)),它也可用于實(shí)現(xiàn)Map/Reduce應(yīng)用程序。
輸入與輸出
Map/Reduce框架運(yùn)轉(zhuǎn)在<key, value> 鍵值對(duì)上,也就是說, 框架把作業(yè)的輸入看為是一組<key, value> 鍵值對(duì),同樣也產(chǎn)出一組 <key, value> 鍵值對(duì)做為作業(yè)的輸出,這兩組鍵值對(duì)的類型可能不同。
框架需要對(duì)key和value的類(classes)進(jìn)行序列化操作, 因此,這些類需要實(shí)現(xiàn) Writable接口。 另外,為了方便框架執(zhí)行排序操作,key類必須實(shí)現(xiàn) WritableComparable接口。
一個(gè)Map/Reduce 作業(yè)的輸入和輸出類型如下所示:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
例子:WordCount v1.0
在深入細(xì)節(jié)之前,讓我們先看一個(gè)Map/Reduce的應(yīng)用示例,以便對(duì)它們的工作方式有一個(gè)初步的認(rèn)識(shí)。
WordCount是一個(gè)簡(jiǎn)單的應(yīng)用,它可以計(jì)算出指定數(shù)據(jù)集中每一個(gè)單詞出現(xiàn)的次數(shù)。
這個(gè)應(yīng)用適用于 單機(jī)模式, 偽分布式模式 或 完全分布式模式 三種Hadoop安裝方式。
源代碼
WordCount.java
1. package org.myorg;
2.
3. import java.io.IOException;
4. import java.util.*;
5.
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.conf.*;
8. import org.apache.hadoop.io.*;
9. import org.apache.hadoop.mapred.*;
10. import org.apache.hadoop.util.*;
11.
12. public class WordCount {
13.
14. public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
15. private final static IntWritable one = new IntWritable(1);
16. private Text word = new Text();
17.
18. public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
19. String line = value.toString();
20. StringTokenizer tokenizer = new StringTokenizer(line);
21. while (tokenizer.hasMoreTokens()) {
22. word.set(tokenizer.nextToken());
23. output.collect(word, one);
24. }
25. }
26. }
27.
28. public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
29. public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
30. int sum = 0;
31. while (values.hasNext()) {
32. sum += values.next().get();
33. }
34. output.collect(key, new IntWritable(sum));
35. }
36. }
37.
38. public static void main(String[] args) throws Exception {
39. JobConf conf = new JobConf(WordCount.class);
40. conf.setJobName("wordcount");
41.
42. conf.setOutputKeyClass(Text.class);
43. conf.setOutputValueClass(IntWritable.class);
44.
45. conf.setMapperClass(Map.class);
46. conf.setCombinerClass(Reduce.class);
47. conf.setReducerClass(Reduce.class);
48.
49. conf.setInputFormat(TextInputFormat.class);
50. conf.setOutputFormat(TextOutputFormat.class);
51.
52. FileInputFormat.setInputPaths(conf, new Path(args[0]));
53. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54.
55. JobClient.runJob(conf);
57. }
58. }
59.
用法
假設(shè)環(huán)境變量HADOOP_HOME對(duì)應(yīng)安裝時(shí)的根目錄,HADOOP_VERSION對(duì)應(yīng)Hadoop的當(dāng)前安裝版本,編譯WordCount.java來創(chuàng)建jar包,可如下操作:
$ mkdir wordcount_classes
$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d wordcount_classes WordCount.java
$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .
假設(shè):
/usr/joe/wordcount/input - 是HDFS中的輸入路徑
/usr/joe/wordcount/output - 是HDFS中的輸出路徑
用示例文本文件做為輸入:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World Bye World
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
運(yùn)行應(yīng)用程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output
輸出是:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
應(yīng)用程序能夠使用-files選項(xiàng)來指定一個(gè)由逗號(hào)分隔的路徑列表,這些路徑是task的當(dāng)前工作目錄。使用選項(xiàng)-libjars可以向map和reduce的classpath中添加jar包。使用-archives選項(xiàng)程序可以傳遞檔案文件做為參數(shù),這些檔案文件會(huì)被解壓并且在task的當(dāng)前工作目錄下會(huì)創(chuàng)建一個(gè)指向解壓生成的目錄的符號(hào)鏈接(以壓縮包的名字命名)。 有關(guān)命令行選項(xiàng)的更多細(xì)節(jié)請(qǐng)參考 Commands manual。
使用-libjars和-files運(yùn)行wordcount例子:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output
解釋
WordCount應(yīng)用程序非常直截了當(dāng)。
Mapper(14-26行)中的map方法(18-25行)通過指定的 TextInputFormat(49行)一次處理一行。然后,它通過StringTokenizer 以空格為分隔符將一行切分為若干tokens,之后,輸出< <word>, 1> 形式的鍵值對(duì)。
對(duì)于示例中的第一個(gè)輸入,map輸出是:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
第二個(gè)輸入,map輸出是:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
關(guān)于組成一個(gè)指定作業(yè)的map數(shù)目的確定,以及如何以更精細(xì)的方式去控制這些map,我們將在教程的后續(xù)部分學(xué)習(xí)到更多的內(nèi)容。
WordCount還指定了一個(gè)combiner (46行)。因此,每次map運(yùn)行之后,會(huì)對(duì)輸出按照key進(jìn)行排序,然后把輸出傳遞給本地的combiner(按照作業(yè)的配置與Reducer一樣),進(jìn)行本地聚合。
第一個(gè)map的輸出是:
< Bye, 1>
< Hello, 1>
< World, 2>
第二個(gè)map的輸出是:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
Reducer(28-36行)中的reduce方法(29-35行) 僅是將每個(gè)key(本例中就是單詞)出現(xiàn)的次數(shù)求和。
因此這個(gè)作業(yè)的輸出就是:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
代碼中的run方法中指定了作業(yè)的幾個(gè)方面, 例如:通過命令行傳遞過來的輸入/輸出路徑、key/value的類型、輸入/輸出的格式等等JobConf中的配置信息。隨后程序調(diào)用了JobClient.runJob(55行)來提交作業(yè)并且監(jiān)控它的執(zhí)行。
我們將在本教程的后續(xù)部分學(xué)習(xí)更多的關(guān)于JobConf, JobClient, Tool和其他接口及類(class)。
Map/Reduce - 用戶界面
這部分文檔為用戶將會(huì)面臨的Map/Reduce框架中的各個(gè)環(huán)節(jié)提供了適當(dāng)?shù)募?xì)節(jié)。這應(yīng)該會(huì)幫助用戶更細(xì)粒度地去實(shí)現(xiàn)、配置和調(diào)優(yōu)作業(yè)。然而,請(qǐng)注意每個(gè)類/接口的javadoc文檔提供最全面的文檔;本文只是想起到指南的作用。
我們會(huì)先看看Mapper和Reducer接口。應(yīng)用程序通常會(huì)通過提供map和reduce方法來實(shí)現(xiàn)它們。
然后,我們會(huì)討論其他的核心接口,其中包括: JobConf,JobClient,Partitioner, OutputCollector,Reporter, InputFormat,OutputFormat等等。
最后,我們將通過討論框架中一些有用的功能點(diǎn)(例如:DistributedCache, IsolationRunner等等)來收尾。
核心功能描述
應(yīng)用程序通常會(huì)通過提供map和reduce來實(shí)現(xiàn) Mapper和Reducer接口,它們組成作業(yè)的核心。
Mapper
Mapper將輸入鍵值對(duì)(key/value pair)映射到一組中間格式的鍵值對(duì)集合。
Map是一類將輸入記錄集轉(zhuǎn)換為中間格式記錄集的獨(dú)立任務(wù)。 這種轉(zhuǎn)換的中間格式記錄集不需要與輸入記錄集的類型一致。一個(gè)給定的輸入鍵值對(duì)可以映射成0個(gè)或多個(gè)輸出鍵值對(duì)。
Hadoop Map/Reduce框架為每一個(gè)InputSplit產(chǎn)生一個(gè)map任務(wù),而每個(gè)InputSplit是由該作業(yè)的InputFormat產(chǎn)生的。
概括地說,對(duì)Mapper的實(shí)現(xiàn)者需要重寫 JobConfigurable.configure(JobConf)方法,這個(gè)方法需要傳遞一個(gè)JobConf參數(shù),目的是完成Mapper的初始化工作。然后,框架為這個(gè)任務(wù)的InputSplit中每個(gè)鍵值對(duì)調(diào)用一次 map(WritableComparable, Writable, OutputCollector, Reporter)操作。應(yīng)用程序可以通過重寫Closeable.close()方法來執(zhí)行相應(yīng)的清理工作。
輸出鍵值對(duì)不需要與輸入鍵值對(duì)的類型一致。一個(gè)給定的輸入鍵值對(duì)可以映射成0個(gè)或多個(gè)輸出鍵值對(duì)。通過調(diào)用 OutputCollector.collect(WritableComparable,Writable)可以收集輸出的鍵值對(duì)。
應(yīng)用程序可以使用Reporter報(bào)告進(jìn)度,設(shè)定應(yīng)用級(jí)別的狀態(tài)消息,更新Counters(計(jì)數(shù)器),或者僅是表明自己運(yùn)行正常。
框架隨后會(huì)把與一個(gè)特定key關(guān)聯(lián)的所有中間過程的值(value)分成組,然后把它們傳給Reducer以產(chǎn)出最終的結(jié)果。用戶可以通過 JobConf.setOutputKeyComparatorClass(Class)來指定具體負(fù)責(zé)分組的 Comparator。
Mapper的輸出被排序后,就被劃分給每個(gè)Reducer。分塊的總數(shù)目和一個(gè)作業(yè)的reduce任務(wù)的數(shù)目是一樣的。用戶可以通過實(shí)現(xiàn)自定義的 Partitioner來控制哪個(gè)key被分配給哪個(gè) Reducer。
用戶可選擇通過 JobConf.setCombinerClass(Class)指定一個(gè)combiner,它負(fù)責(zé)對(duì)中間過程的輸出進(jìn)行本地的聚集,這會(huì)有助于降低從Mapper到 Reducer數(shù)據(jù)傳輸量。
這些被排好序的中間過程的輸出結(jié)果保存的格式是(key-len, key, value-len, value),應(yīng)用程序可以通過JobConf控制對(duì)這些中間結(jié)果是否進(jìn)行壓縮以及怎么壓縮,使用哪種 CompressionCodec。
需要多少個(gè)Map?
Map的數(shù)目通常是由輸入數(shù)據(jù)的大小決定的,一般就是所有輸入文件的總塊(block)數(shù)。
Map正常的并行規(guī)模大致是每個(gè)節(jié)點(diǎn)(node)大約10到100個(gè)map,對(duì)于CPU 消耗較小的map任務(wù)可以設(shè)到300個(gè)左右。由于每個(gè)任務(wù)初始化需要一定的時(shí)間,因此,比較合理的情況是map執(zhí)行的時(shí)間至少超過1分鐘。
這樣,如果你輸入10TB的數(shù)據(jù),每個(gè)塊(block)的大小是128MB,你將需要大約82,000個(gè)map來完成任務(wù),除非使用 setNumMapTasks(int)(注意:這里僅僅是對(duì)框架進(jìn)行了一個(gè)提示(hint),實(shí)際決定因素見這里)將這個(gè)數(shù)值設(shè)置得更高。
Reducer
Reducer將與一個(gè)key關(guān)聯(lián)的一組中間數(shù)值集歸約(reduce)為一個(gè)更小的數(shù)值集。
用戶可以通過 JobConf.setNumReduceTasks(int)設(shè)定一個(gè)作業(yè)中reduce任務(wù)的數(shù)目。
概括地說,對(duì)Reducer的實(shí)現(xiàn)者需要重寫 JobConfigurable.configure(JobConf)方法,這個(gè)方法需要傳遞一個(gè)JobConf參數(shù),目的是完成Reducer的初始化工作。然后,框架為成組的輸入數(shù)據(jù)中的每個(gè)<key, (list of values)>對(duì)調(diào)用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。之后,應(yīng)用程序可以通過重寫Closeable.close()來執(zhí)行相應(yīng)的清理工作。
Reducer有3個(gè)主要階段:shuffle、sort和reduce。
Shuffle
Reducer的輸入就是Mapper已經(jīng)排好序的輸出。在這個(gè)階段,框架通過HTTP為每個(gè)Reducer獲得所有Mapper輸出中與之相關(guān)的分塊。
Sort
這個(gè)階段,框架將按照key的值對(duì)Reducer的輸入進(jìn)行分組 (因?yàn)椴煌琺apper的輸出中可能會(huì)有相同的key)。
Shuffle和Sort兩個(gè)階段是同時(shí)進(jìn)行的;map的輸出也是一邊被取回一邊被合并的。
Secondary Sort
如果需要中間過程對(duì)key的分組規(guī)則和reduce前對(duì)key的分組規(guī)則不同,那么可以通過 JobConf.setOutputValueGroupingComparator(Class)來指定一個(gè)Comparator。再加上 JobConf.setOutputKeyComparatorClass(Class)可用于控制中間過程的key如何被分組,所以結(jié)合兩者可以實(shí)現(xiàn)按值的二次排序。
Reduce
在這個(gè)階段,框架為已分組的輸入數(shù)據(jù)中的每個(gè) <key, (list of values)>對(duì)調(diào)用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。
Reduce任務(wù)的輸出通常是通過調(diào)用 OutputCollector.collect(WritableComparable, Writable)寫入 文件系統(tǒng)的。
應(yīng)用程序可以使用Reporter報(bào)告進(jìn)度,設(shè)定應(yīng)用程序級(jí)別的狀態(tài)消息,更新Counters(計(jì)數(shù)器),或者僅是表明自己運(yùn)行正常。
Reducer的輸出是沒有排序的。
需要多少個(gè)Reduce?
Reduce的數(shù)目建議是0.95或1.75乘以 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。
用0.95,所有reduce可以在maps一完成時(shí)就立刻啟動(dòng),開始傳輸map的輸出結(jié)果。用1.75,速度快的節(jié)點(diǎn)可以在完成第一輪reduce任務(wù)后,可以開始第二輪,這樣可以得到比較好的負(fù)載均衡的效果。
增加reduce的數(shù)目會(huì)增加整個(gè)框架的開銷,但可以改善負(fù)載均衡,降低由于執(zhí)行失敗帶來的負(fù)面影響。
上述比例因子比整體數(shù)目稍小一些是為了給框架中的推測(cè)性任務(wù)(speculative-tasks) 或失敗的任務(wù)預(yù)留一些reduce的資源。
無Reducer
如果沒有歸約要進(jìn)行,那么設(shè)置reduce任務(wù)的數(shù)目為零是合法的。
這種情況下,map任務(wù)的輸出會(huì)直接被寫入由 setOutputPath(Path)指定的輸出路徑。框架在把它們寫入FileSystem之前沒有對(duì)它們進(jìn)行排序。
Partitioner
Partitioner用于劃分鍵值空間(key space)。
Partitioner負(fù)責(zé)控制map輸出結(jié)果key的分割。Key(或者一個(gè)key子集)被用于產(chǎn)生分區(qū),通常使用的是Hash函數(shù)。分區(qū)的數(shù)目與一個(gè)作業(yè)的reduce任務(wù)的數(shù)目是一樣的。因此,它控制將中間過程的key(也就是這條記錄)應(yīng)該發(fā)送給m個(gè)reduce任務(wù)中的哪一個(gè)來進(jìn)行reduce操作。
HashPartitioner是默認(rèn)的 Partitioner。
Reporter
Reporter是用于Map/Reduce應(yīng)用程序報(bào)告進(jìn)度,設(shè)定應(yīng)用級(jí)別的狀態(tài)消息, 更新Counters(計(jì)數(shù)器)的機(jī)制。
Mapper和Reducer的實(shí)現(xiàn)可以利用Reporter 來報(bào)告進(jìn)度,或者僅是表明自己運(yùn)行正常。在那種應(yīng)用程序需要花很長(zhǎng)時(shí)間處理個(gè)別鍵值對(duì)的場(chǎng)景中,這種機(jī)制是很關(guān)鍵的,因?yàn)榭蚣芸赡軙?huì)以為這個(gè)任務(wù)超時(shí)了,從而將它強(qiáng)行殺死。另一個(gè)避免這種情況發(fā)生的方式是,將配置參數(shù)mapred.task.timeout設(shè)置為一個(gè)足夠高的值(或者干脆設(shè)置為零,則沒有超時(shí)限制了)。
應(yīng)用程序可以用Reporter來更新Counter(計(jì)數(shù)器)。
OutputCollector
OutputCollector是一個(gè)Map/Reduce框架提供的用于收集 Mapper或Reducer輸出數(shù)據(jù)的通用機(jī)制 (包括中間輸出結(jié)果和作業(yè)的輸出結(jié)果)。
Hadoop Map/Reduce框架附帶了一個(gè)包含許多實(shí)用型的mapper、reducer和partitioner 的類庫(kù)。
作業(yè)配置
JobConf代表一個(gè)Map/Reduce作業(yè)的配置。
JobConf是用戶向Hadoop框架描述一個(gè)Map/Reduce作業(yè)如何執(zhí)行的主要接口。框架會(huì)按照J(rèn)obConf描述的信息忠實(shí)地去嘗試完成這個(gè)作業(yè),然而:
一些參數(shù)可能會(huì)被管理者標(biāo)記為 final,這意味它們不能被更改。
一些作業(yè)的參數(shù)可以被直截了當(dāng)?shù)剡M(jìn)行設(shè)置(例如: setNumReduceTasks(int)),而另一些參數(shù)則與框架或者作業(yè)的其他參數(shù)之間微妙地相互影響,并且設(shè)置起來比較復(fù)雜(例如: setNumMapTasks(int))。
通常,JobConf會(huì)指明Mapper、Combiner(如果有的話)、 Partitioner、Reducer、InputFormat和 OutputFormat的具體實(shí)現(xiàn)。JobConf還能指定一組輸入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及輸出文件應(yīng)該寫在哪兒 (setOutputPath(Path))。
JobConf可選擇地對(duì)作業(yè)設(shè)置一些高級(jí)選項(xiàng),例如:設(shè)置Comparator; 放到DistributedCache上的文件;中間結(jié)果或者作業(yè)輸出結(jié)果是否需要壓縮以及怎么壓縮; 利用用戶提供的腳本(setMapDebugScript(String)/setReduceDebugScript(String)) 進(jìn)行調(diào)試;作業(yè)是否允許預(yù)防性(speculative)任務(wù)的執(zhí)行 (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) ;每個(gè)任務(wù)最大的嘗試次數(shù) (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) ;一個(gè)作業(yè)能容忍的任務(wù)失敗的百分比 (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) ;等等。
當(dāng)然,用戶能使用 set(String, String)/get(String, String) 來設(shè)置或者取得應(yīng)用程序需要的任意參數(shù)。然而,DistributedCache的使用是面向大規(guī)模只讀數(shù)據(jù)的。
任務(wù)的執(zhí)行和環(huán)境
TaskTracker是在一個(gè)單獨(dú)的jvm上以子進(jìn)程的形式執(zhí)行 Mapper/Reducer任務(wù)(Task)的。
子任務(wù)會(huì)繼承父TaskTracker的環(huán)境。用戶可以通過JobConf中的 mapred.child.java.opts配置參數(shù)來設(shè)定子jvm上的附加選項(xiàng),例如: 通過-Djava.library.path=<> 將一個(gè)非標(biāo)準(zhǔn)路徑設(shè)為運(yùn)行時(shí)的鏈接用以搜索共享庫(kù),等等。如果mapred.child.java.opts包含一個(gè)符號(hào)@taskid@, 它會(huì)被替換成map/reduce的taskid的值。
下面是一個(gè)包含多個(gè)參數(shù)和替換的例子,其中包括:記錄jvm GC日志; JVM JMX代理程序以無密碼的方式啟動(dòng),這樣它就能連接到j(luò)console上,從而可以查看子進(jìn)程的內(nèi)存和線程,得到線程的dump;還把子jvm的最大堆尺寸設(shè)置為512MB, 并為子jvm的java.library.path添加了一個(gè)附加路徑。
<property>
<name>mapred.child.java.opts</name>
<value>
-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
</value>
</property>
用戶或管理員也可以使用mapred.child.ulimit設(shè)定運(yùn)行的子任務(wù)的最大虛擬內(nèi)存。mapred.child.ulimit的值以(KB)為單位,并且必須大于或等于-Xmx參數(shù)傳給JavaVM的值,否則VM會(huì)無法啟動(dòng)。
注意:mapred.child.java.opts只用于設(shè)置task tracker啟動(dòng)的子任務(wù)。為守護(hù)進(jìn)程設(shè)置內(nèi)存選項(xiàng)請(qǐng)查看 cluster_setup.html
${mapred.local.dir}/taskTracker/是task tracker的本地目錄, 用于創(chuàng)建本地緩存和job。它可以指定多個(gè)目錄(跨越多個(gè)磁盤),文件會(huì)半隨機(jī)的保存到本地路徑下的某個(gè)目錄。當(dāng)job啟動(dòng)時(shí),task tracker根據(jù)配置文檔創(chuàng)建本地job目錄,目錄結(jié)構(gòu)如以下所示:
${mapred.local.dir}/taskTracker/archive/ :分布式緩存。這個(gè)目錄保存本地的分布式緩存。因此本地分布式緩存是在所有task和job間共享的。
${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job目錄。
${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job指定的共享目錄。各個(gè)任務(wù)可以使用這個(gè)空間做為暫存空間,用于它們之間共享文件。這個(gè)目錄通過job.local.dir 參數(shù)暴露給用戶。這個(gè)路徑可以通過API JobConf.getJobLocalDir()來訪問。它也可以被做為系統(tǒng)屬性獲得。因此,用戶(比如運(yùn)行streaming)可以調(diào)用System.getProperty("job.local.dir")獲得該目錄。
${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 存放jar包的路徑,用于存放作業(yè)的jar文件和展開的jar。job.jar是應(yīng)用程序的jar文件,它會(huì)被自動(dòng)分發(fā)到各臺(tái)機(jī)器,在task啟動(dòng)前會(huì)被自動(dòng)展開。使用api JobConf.getJar() 函數(shù)可以得到j(luò)ob.jar的位置。使用JobConf.getJar().getParent()可以訪問存放展開的jar包的目錄。
${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 一個(gè)job.xml文件,本地的通用的作業(yè)配置文件。
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 每個(gè)任務(wù)有一個(gè)目錄task-id,它里面有如下的目錄結(jié)構(gòu):
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 一個(gè)job.xml文件,本地化的任務(wù)作業(yè)配置文件。任務(wù)本地化是指為該task設(shè)定特定的屬性值。這些值會(huì)在下面具體說明。
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output 一個(gè)存放中間過程的輸出文件的目錄。它保存了由framwork產(chǎn)生的臨時(shí)map reduce數(shù)據(jù),比如map的輸出文件等。
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的當(dāng)前工作目錄。
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的臨時(shí)目錄。(用戶可以設(shè)定屬性mapred.child.tmp 來為map和reduce task設(shè)定臨時(shí)目錄。缺省值是./tmp。如果這個(gè)值不是絕對(duì)路徑, 它會(huì)把task的工作路徑加到該路徑前面作為task的臨時(shí)文件路徑。如果這個(gè)值是絕對(duì)路徑則直接使用這個(gè)值。 如果指定的目錄不存在,會(huì)自動(dòng)創(chuàng)建該目錄。之后,按照選項(xiàng) -Djava.io.tmpdir='臨時(shí)文件的絕對(duì)路徑'執(zhí)行java子任務(wù)。 pipes和streaming的臨時(shí)文件路徑是通過環(huán)境變量TMPDIR='the absolute path of the tmp dir'設(shè)定的)。 如果mapred.child.tmp有./tmp值,這個(gè)目錄會(huì)被創(chuàng)建。
下面的屬性是為每個(gè)task執(zhí)行時(shí)使用的本地參數(shù),它們保存在本地化的任務(wù)作業(yè)配置文件里:
名稱 類型 描述
mapred.job.id String job id
mapred.jar String job目錄下job.jar的位置
job.local.dir String job指定的共享存儲(chǔ)空間
mapred.tip.id String task id
mapred.task.id String task嘗試id
mapred.task.is.map boolean 是否是map task
mapred.task.partition int task在job中的id
map.input.file String map讀取的文件名
map.input.start long map輸入的數(shù)據(jù)塊的起始位置偏移
map.input.length long map輸入的數(shù)據(jù)塊的字節(jié)數(shù)
mapred.work.output.dir String task臨時(shí)輸出目錄
task的標(biāo)準(zhǔn)輸出和錯(cuò)誤輸出流會(huì)被讀到TaskTracker中,并且記錄到 ${HADOOP_LOG_DIR}/userlogs
DistributedCache 可用于map或reduce task中分發(fā)jar包和本地庫(kù)。子jvm總是把 當(dāng)前工作目錄 加到 java.library.path 和 LD_LIBRARY_PATH。 因此,可以通過 System.loadLibrary或 System.load裝載緩存的庫(kù)。有關(guān)使用分布式緩存加載共享庫(kù)的細(xì)節(jié)請(qǐng)參考 native_libraries.html
作業(yè)的提交與監(jiān)控
JobClient是用戶提交的作業(yè)與JobTracker交互的主要接口。
JobClient 提供提交作業(yè),追蹤進(jìn)程,訪問子任務(wù)的日志記錄,獲得Map/Reduce集群狀態(tài)信息等功能。
作業(yè)提交過程包括:
檢查作業(yè)輸入輸出樣式細(xì)節(jié)
為作業(yè)計(jì)算InputSplit值。
如果需要的話,為作業(yè)的DistributedCache建立必須的統(tǒng)計(jì)信息。
拷貝作業(yè)的jar包和配置文件到FileSystem上的Map/Reduce系統(tǒng)目錄下。
提交作業(yè)到JobTracker并且監(jiān)控它的狀態(tài)。
作業(yè)的歷史文件記錄到指定目錄的"_logs/history/"子目錄下。這個(gè)指定目錄由hadoop.job.history.user.location設(shè)定,默認(rèn)是作業(yè)輸出的目錄。因此默認(rèn)情況下,文件會(huì)存放在mapred.output.dir/_logs/history目錄下。用戶可以設(shè)置hadoop.job.history.user.location為none來停止日志記錄。
用戶使用下面的命令可以看到在指定目錄下的歷史日志記錄的摘要。
$ bin/hadoop job -history output-dir
這個(gè)命令會(huì)打印出作業(yè)的細(xì)節(jié),以及失敗的和被殺死的任務(wù)細(xì)節(jié)。
要查看有關(guān)作業(yè)的更多細(xì)節(jié)例如成功的任務(wù)、每個(gè)任務(wù)嘗試的次數(shù)(task attempt)等,可以使用下面的命令
$ bin/hadoop job -history all output-dir
用戶可以使用 OutputLogFilter 從輸出目錄列表中篩選日志文件。
一般情況,用戶利用JobConf創(chuàng)建應(yīng)用程序并配置作業(yè)屬性, 然后用 JobClient 提交作業(yè)并監(jiān)視它的進(jìn)程。
作業(yè)的控制
有時(shí)候,用一個(gè)單獨(dú)的Map/Reduce作業(yè)并不能完成一個(gè)復(fù)雜的任務(wù),用戶也許要鏈接多個(gè)Map/Reduce作業(yè)才行。這是容易實(shí)現(xiàn)的,因?yàn)樽鳂I(yè)通常輸出到分布式文件系統(tǒng)上的,所以可以把這個(gè)作業(yè)的輸出作為下一個(gè)作業(yè)的輸入實(shí)現(xiàn)串聯(lián)。
然而,這也意味著,確保每一作業(yè)完成(成功或失敗)的責(zé)任就直接落在了客戶身上。在這種情況下,可以用的控制作業(yè)的選項(xiàng)有:
runJob(JobConf):提交作業(yè),僅當(dāng)作業(yè)完成時(shí)返回。
submitJob(JobConf):只提交作業(yè),之后需要你輪詢它返回的 RunningJob句柄的狀態(tài),并根據(jù)情況調(diào)度。
JobConf.setJobEndNotificationURI(String):設(shè)置一個(gè)作業(yè)完成通知,可避免輪詢。
作業(yè)的輸入
InputFormat 為Map/Reduce作業(yè)描述輸入的細(xì)節(jié)規(guī)范。
Map/Reduce框架根據(jù)作業(yè)的InputFormat來:
檢查作業(yè)輸入的有效性。
把輸入文件切分成多個(gè)邏輯InputSplit實(shí)例, 并把每一實(shí)例分別分發(fā)給一個(gè) Mapper。
提供RecordReader的實(shí)現(xiàn),這個(gè)RecordReader從邏輯InputSplit中獲得輸入記錄, 這些記錄將由Mapper處理。
基于文件的InputFormat實(shí)現(xiàn)(通常是 FileInputFormat的子類) 默認(rèn)行為是按照輸入文件的字節(jié)大小,把輸入數(shù)據(jù)切分成邏輯分塊(logical InputSplit )。 其中輸入文件所在的FileSystem的數(shù)據(jù)塊尺寸是分塊大小的上限。下限可以設(shè)置mapred.min.split.size 的值。
考慮到邊界情況,對(duì)于很多應(yīng)用程序來說,很明顯按照文件大小進(jìn)行邏輯分割是不能滿足需求的。 在這種情況下,應(yīng)用程序需要實(shí)現(xiàn)一個(gè)RecordReader來處理記錄的邊界并為每個(gè)任務(wù)提供一個(gè)邏輯分塊的面向記錄的視圖。
TextInputFormat 是默認(rèn)的InputFormat。
如果一個(gè)作業(yè)的Inputformat是TextInputFormat, 并且框架檢測(cè)到輸入文件的后綴是.gz和.lzo,就會(huì)使用對(duì)應(yīng)的CompressionCodec自動(dòng)解壓縮這些文件。 但是需要注意,上述帶后綴的壓縮文件不會(huì)被切分,并且整個(gè)壓縮文件會(huì)分給一個(gè)mapper來處理。
InputSplit
InputSplit 是一個(gè)單獨(dú)的Mapper要處理的數(shù)據(jù)塊。
一般的InputSplit 是字節(jié)樣式輸入,然后由RecordReader處理并轉(zhuǎn)化成記錄樣式。
FileSplit 是默認(rèn)的InputSplit。 它把 map.input.file 設(shè)定為輸入文件的路徑,輸入文件是邏輯分塊文件。
RecordReader
RecordReader 從InputSlit讀入<key, value>對(duì)。
一般的,RecordReader 把由InputSplit 提供的字節(jié)樣式的輸入文件,轉(zhuǎn)化成由Mapper處理的記錄樣式的文件。 因此RecordReader負(fù)責(zé)處理記錄的邊界情況和把數(shù)據(jù)表示成keys/values對(duì)形式。
作業(yè)的輸出
OutputFormat 描述Map/Reduce作業(yè)的輸出樣式。
Map/Reduce框架根據(jù)作業(yè)的OutputFormat來:
檢驗(yàn)作業(yè)的輸出,例如檢查輸出路徑是否已經(jīng)存在。
提供一個(gè)RecordWriter的實(shí)現(xiàn),用來輸出作業(yè)結(jié)果。 輸出文件保存在FileSystem上。
TextOutputFormat是默認(rèn)的 OutputFormat。
任務(wù)的Side-Effect File
在一些應(yīng)用程序中,子任務(wù)需要產(chǎn)生一些side-file,這些文件與作業(yè)實(shí)際輸出結(jié)果的文件不同。
在這種情況下,同一個(gè)Mapper或者Reducer的兩個(gè)實(shí)例(比如預(yù)防性任務(wù))同時(shí)打開或者寫 FileSystem上的同一文件就會(huì)產(chǎn)生沖突。因此應(yīng)用程序在寫文件的時(shí)候需要為每次任務(wù)嘗試(不僅僅是每次任務(wù),每個(gè)任務(wù)可以嘗試執(zhí)行很多次)選取一個(gè)獨(dú)一無二的文件名(使用attemptid,例如task_200709221812_0001_m_000000_0)。
為了避免沖突,Map/Reduce框架為每次嘗試執(zhí)行任務(wù)都建立和維護(hù)一個(gè)特殊的 ${mapred.output.dir}/_temporary/_${taskid}子目錄,這個(gè)目錄位于本次嘗試執(zhí)行任務(wù)輸出結(jié)果所在的FileSystem上,可以通過 ${mapred.work.output.dir}來訪問這個(gè)子目錄。 對(duì)于成功完成的任務(wù)嘗試,只有${mapred.output.dir}/_temporary/_${taskid}下的文件會(huì)移動(dòng)到${mapred.output.dir}。當(dāng)然,框架會(huì)丟棄那些失敗的任務(wù)嘗試的子目錄。這種處理過程對(duì)于應(yīng)用程序來說是完全透明的。
在任務(wù)執(zhí)行期間,應(yīng)用程序在寫文件時(shí)可以利用這個(gè)特性,比如 通過 FileOutputFormat.getWorkOutputPath()獲得${mapred.work.output.dir}目錄, 并在其下創(chuàng)建任意任務(wù)執(zhí)行時(shí)所需的side-file,框架在任務(wù)嘗試成功時(shí)會(huì)馬上移動(dòng)這些文件,因此不需要在程序內(nèi)為每次任務(wù)嘗試選取一個(gè)獨(dú)一無二的名字。
注意:在每次任務(wù)嘗試執(zhí)行期間,${mapred.work.output.dir} 的值實(shí)際上是 ${mapred.output.dir}/_temporary/_{$taskid},這個(gè)值是Map/Reduce框架創(chuàng)建的。 所以使用這個(gè)特性的方法是,在 FileOutputFormat.getWorkOutputPath() 路徑下創(chuàng)建side-file即可。
對(duì)于只使用map不使用reduce的作業(yè),這個(gè)結(jié)論也成立。這種情況下,map的輸出結(jié)果直接生成到HDFS上。
RecordWriter
RecordWriter 生成<key, value> 對(duì)到輸出文件。
RecordWriter的實(shí)現(xiàn)把作業(yè)的輸出結(jié)果寫到 FileSystem。
其他有用的特性
Counters
Counters 是多個(gè)由Map/Reduce框架或者應(yīng)用程序定義的全局計(jì)數(shù)器。 每一個(gè)Counter可以是任何一種 Enum類型。同一特定Enum類型的Counter可以匯集到一個(gè)組,其類型為Counters.Group。
應(yīng)用程序可以定義任意(Enum類型)的Counters并且可以通過 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long) 更新。之后框架會(huì)匯總這些全局counters。
DistributedCache
DistributedCache 可將具體應(yīng)用相關(guān)的、大尺寸的、只讀的文件有效地分布放置。
DistributedCache 是Map/Reduce框架提供的功能,能夠緩存應(yīng)用程序所需的文件 (包括文本,檔案文件,jar文件等)。
應(yīng)用程序在JobConf中通過url(hdfs://)指定需要被緩存的文件。 DistributedCache假定由hdfs://格式url指定的文件已經(jīng)在 FileSystem上了。
Map-Redcue框架在作業(yè)所有任務(wù)執(zhí)行之前會(huì)把必要的文件拷貝到slave節(jié)點(diǎn)上。 它運(yùn)行高效是因?yàn)槊總€(gè)作業(yè)的文件只拷貝一次并且為那些沒有文檔的slave節(jié)點(diǎn)緩存文檔。
DistributedCache 根據(jù)緩存文檔修改的時(shí)間戳進(jìn)行追蹤。 在作業(yè)執(zhí)行期間,當(dāng)前應(yīng)用程序或者外部程序不能修改緩存文件。
distributedCache可以分發(fā)簡(jiǎn)單的只讀數(shù)據(jù)或文本文件,也可以分發(fā)復(fù)雜類型的文件例如歸檔文件和jar文件。歸檔文件(zip,tar,tgz和tar.gz文件)在slave節(jié)點(diǎn)上會(huì)被解檔(un-archived)。 這些文件可以設(shè)置執(zhí)行權(quán)限。
用戶可以通過設(shè)置mapred.cache.{files|archives}來分發(fā)文件。 如果要分發(fā)多個(gè)文件,可以使用逗號(hào)分隔文件所在路徑。也可以利用API來設(shè)置該屬性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf) 其中URI的形式是 hdfs://host:port/absolute-path#link-name 在Streaming程序中,可以通過命令行選項(xiàng) -cacheFile/-cacheArchive 分發(fā)文件。
用戶可以通過 DistributedCache.createSymlink(Configuration)方法讓DistributedCache 在當(dāng)前工作目錄下創(chuàng)建到緩存文件的符號(hào)鏈接。 或者通過設(shè)置配置文件屬性mapred.create.symlink為yes。 分布式緩存會(huì)截取URI的片段作為鏈接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 則在task當(dāng)前工作目錄會(huì)有名為lib.so的鏈接, 它會(huì)鏈接分布式緩存中的lib.so.1。
DistributedCache可在map/reduce任務(wù)中作為 一種基礎(chǔ)軟件分發(fā)機(jī)制使用。它可以被用于分發(fā)jar包和本地庫(kù)(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能夠被用于 緩存文件和jar包,并把它們加入子jvm的classpath。也可以通過設(shè)置配置文檔里的屬性 mapred.job.classpath.{files|archives}達(dá)到相同的效果。緩存文件可用于分發(fā)和裝載本地庫(kù)。
Tool
Tool 接口支持處理常用的Hadoop命令行選項(xiàng)。
Tool 是Map/Reduce工具或應(yīng)用的標(biāo)準(zhǔn)。應(yīng)用程序應(yīng)只處理其定制參數(shù), 要把標(biāo)準(zhǔn)命令行選項(xiàng)通過 ToolRunner.run(Tool, String[]) 委托給 GenericOptionsParser處理。
Hadoop命令行的常用選項(xiàng)有:
-conf <configuration file>
-D <property=value>
-fs <local|namenode:port>
-jt <local|jobtracker:port>
IsolationRunner
IsolationRunner 是幫助調(diào)試Map/Reduce程序的工具。
使用IsolationRunner的方法是,首先設(shè)置 keep.failed.tasks.files屬性為true (同時(shí)參考keep.tasks.files.pattern)。
然后,登錄到任務(wù)運(yùn)行失敗的節(jié)點(diǎn)上,進(jìn)入 TaskTracker的本地路徑運(yùn)行 IsolationRunner:
$ cd <local path>/taskTracker/${taskid}/work
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
IsolationRunner會(huì)把失敗的任務(wù)放在單獨(dú)的一個(gè)能夠調(diào)試的jvm上運(yùn)行,并且采用和之前完全一樣的輸入數(shù)據(jù)。
Profiling
Profiling是一個(gè)工具,它使用內(nèi)置的java profiler工具進(jìn)行分析獲得(2-3個(gè))map或reduce樣例運(yùn)行分析報(bào)告。
用戶可以通過設(shè)置屬性mapred.task.profile指定系統(tǒng)是否采集profiler信息。 利用api JobConf.setProfileEnabled(boolean)可以修改屬性值。如果設(shè)為true, 則開啟profiling功能。profiler信息保存在用戶日志目錄下。缺省情況,profiling功能是關(guān)閉的。
如果用戶設(shè)定使用profiling功能,可以使用配置文檔里的屬性 mapred.task.profile.{maps|reduces} 設(shè)置要profile map/reduce task的范圍。設(shè)置該屬性值的api是 JobConf.setProfileTaskRange(boolean,String)。 范圍的缺省值是0-2。
用戶可以通過設(shè)定配置文檔里的屬性mapred.task.profile.params 來指定profiler配置參數(shù)。修改屬性要使用api JobConf.setProfileParams(String)。當(dāng)運(yùn)行task時(shí),如果字符串包含%s。 它會(huì)被替換成profileing的輸出文件名。這些參數(shù)會(huì)在命令行里傳遞到子JVM中。缺省的profiling 參數(shù)是 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s。
調(diào)試
Map/Reduce框架能夠運(yùn)行用戶提供的用于調(diào)試的腳本程序。 當(dāng)map/reduce任務(wù)失敗時(shí),用戶可以通過運(yùn)行腳本在任務(wù)日志(例如任務(wù)的標(biāo)準(zhǔn)輸出、標(biāo)準(zhǔn)錯(cuò)誤、系統(tǒng)日志以及作業(yè)配置文件)上做后續(xù)處理工作。用戶提供的調(diào)試腳本程序的標(biāo)準(zhǔn)輸出和標(biāo)準(zhǔn)錯(cuò)誤會(huì)輸出為診斷文件。如果需要的話這些輸出結(jié)果也可以打印在用戶界面上。
在接下來的章節(jié),我們討論如何與作業(yè)一起提交調(diào)試腳本。為了提交調(diào)試腳本, 首先要把這個(gè)腳本分發(fā)出去,而且還要在配置文件里設(shè)置。
如何分發(fā)腳本文件:
用戶要用 DistributedCache 機(jī)制來分發(fā)和鏈接腳本文件
如何提交腳本:
一個(gè)快速提交調(diào)試腳本的方法是分別為需要調(diào)試的map任務(wù)和reduce任務(wù)設(shè)置 "mapred.map.task.debug.script" 和 "mapred.reduce.task.debug.script" 屬性的值。這些屬性也可以通過 JobConf.setMapDebugScript(String) 和 JobConf.setReduceDebugScript(String) API來設(shè)置。對(duì)于streaming, 可以分別為需要調(diào)試的map任務(wù)和reduce任務(wù)使用命令行選項(xiàng)-mapdebug 和 -reducedegug來提交調(diào)試腳本。
腳本的參數(shù)是任務(wù)的標(biāo)準(zhǔn)輸出、標(biāo)準(zhǔn)錯(cuò)誤、系統(tǒng)日志以及作業(yè)配置文件。在運(yùn)行map/reduce失敗的節(jié)點(diǎn)上運(yùn)行調(diào)試命令是:
$script $stdout $stderr $syslog $jobconf
Pipes 程序根據(jù)第五個(gè)參數(shù)獲得c++程序名。 因此調(diào)試pipes程序的命令是
$script $stdout $stderr $syslog $jobconf $program
默認(rèn)行為
對(duì)于pipes,默認(rèn)的腳本會(huì)用gdb處理core dump, 打印 stack trace并且給出正在運(yùn)行線程的信息。
JobControl
JobControl是一個(gè)工具,它封裝了一組Map/Reduce作業(yè)以及他們之間的依賴關(guān)系。
數(shù)據(jù)壓縮
Hadoop Map/Reduce框架為應(yīng)用程序的寫入文件操作提供壓縮工具,這些工具可以為map輸出的中間數(shù)據(jù)和作業(yè)最終輸出數(shù)據(jù)(例如reduce的輸出)提供支持。它還附帶了一些 CompressionCodec的實(shí)現(xiàn),比如實(shí)現(xiàn)了 zlib和lzo壓縮算法。 Hadoop同樣支持gzip文件格式。
考慮到性能問題(zlib)以及Java類庫(kù)的缺失(lzo)等因素,Hadoop也為上述壓縮解壓算法提供本地庫(kù)的實(shí)現(xiàn)。更多的細(xì)節(jié)請(qǐng)參考 這里。
中間輸出
應(yīng)用程序可以通過 JobConf.setCompressMapOutput(boolean)api控制map輸出的中間結(jié)果,并且可以通過 JobConf.setMapOutputCompressorClass(Class)api指定 CompressionCodec。
作業(yè)輸出
應(yīng)用程序可以通過 FileOutputFormat.setCompressOutput(JobConf, boolean) api控制輸出是否需要壓縮并且可以使用 FileOutputFormat.setOutputCompressorClass(JobConf, Class)api指定CompressionCodec。
如果作業(yè)輸出要保存成 SequenceFileOutputFormat格式,需要使用 SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,來設(shè)定 SequenceFile.CompressionType (i.e.RECORD / BLOCK - 默認(rèn)是RECORD)。
例子:WordCount v2.0
這里是一個(gè)更全面的WordCount例子,它使用了我們已經(jīng)討論過的很多Map/Reduce框架提供的功能。
運(yùn)行這個(gè)例子需要HDFS的某些功能,特別是 DistributedCache相關(guān)功能。因此這個(gè)例子只能運(yùn)行在 偽分布式 或者 完全分布式模式的 Hadoop上。
源代碼
WordCount.java
1. package org.myorg;
2.
3. import java.io.*;
4. import java.util.*;
5.
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.filecache.DistributedCache;
8. import org.apache.hadoop.conf.*;
9. import org.apache.hadoop.io.*;
10. import org.apache.hadoop.mapred.*;
11. import org.apache.hadoop.util.*;
12.
13. public class WordCount extends Configured implements Tool {
14.
15. public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
16.
17. static enum Counters { INPUT_WORDS }
18.
19. private final static IntWritable one = new IntWritable(1);
20. private Text word = new Text();
21.
22. private boolean caseSensitive = true;
23. private Set<String> patternsToSkip = new HashSet<String>();
24.
25. private long numRecords = 0;
26. private String inputFile;
27.
28. public void configure(JobConf job) {
29. caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
30. inputFile = job.get("map.input.file");
31.
32. if (job.getBoolean("wordcount.skip.patterns", false)) {
33. Path[] patternsFiles = new Path[0];
34. try {
35. patternsFiles = DistributedCache.getLocalCacheFiles(job);
36. } catch (IOException ioe) {
37. System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
38. }
39. for (Path patternsFile : patternsFiles) {
40. parseSkipFile(patternsFile);
41. }
42. }
43. }
44.
45. private void parseSkipFile(Path patternsFile) {
46. try {
47. BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
48. String pattern = null;
49. while ((pattern = fis.readLine()) != null) {
50. patternsToSkip.add(pattern);
51. }
52. } catch (IOException ioe) {
53. System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe));
54. }
55. }
56.
57. public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
58. String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
59.
60. for (String pattern : patternsToSkip) {
61. line = line.replaceAll(pattern, "");
62. }
63.
64. StringTokenizer tokenizer = new StringTokenizer(line);
65. while (tokenizer.hasMoreTokens()) {
66. word.set(tokenizer.nextToken());
67. output.collect(word, one);
68. reporter.incrCounter(Counters.INPUT_WORDS, 1);
69. }
70.
71. if ((++numRecords % 100) == 0) {
72. reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
73. }
74. }
75. }
76.
77. public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
78. public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
79. int sum = 0;
80. while (values.hasNext()) {
81. sum += values.next().get();
82. }
83. output.collect(key, new IntWritable(sum));
84. }
85. }
86.
87. public int run(String[] args) throws Exception {
88. JobConf conf = new JobConf(getConf(), WordCount.class);
89. conf.setJobName("wordcount");
90.
91. conf.setOutputKeyClass(Text.class);
92. conf.setOutputValueClass(IntWritable.class);
93.
94. conf.setMapperClass(Map.class);
95. conf.setCombinerClass(Reduce.class);
96. conf.setReducerClass(Reduce.class);
97.
98. conf.setInputFormat(TextInputFormat.class);
99. conf.setOutputFormat(TextOutputFormat.class);
100.
101. List<String> other_args = new ArrayList<String>();
102. for (int i=0; i < args.length; ++i) {
103. if ("-skip".equals(args[i])) {
104. DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
105. conf.setBoolean("wordcount.skip.patterns", true);
106. } else {
107. other_args.add(args[i]);
108. }
109. }
110.
111. FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
112. FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
113.
114. JobClient.runJob(conf);
115. return 0;
116. }
117.
118. public static void main(String[] args) throws Exception {
119. int res = ToolRunner.run(new Configuration(), new WordCount(), args);
120. System.exit(res);
121. }
122. }
123.
運(yùn)行樣例
輸入樣例:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World, Bye World!
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.
運(yùn)行程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output
輸出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1
注意此時(shí)的輸入與第一個(gè)版本的不同,輸出的結(jié)果也有不同。
現(xiàn)在通過DistributedCache插入一個(gè)模式文件,文件中保存了要被忽略的單詞模式。
$ hadoop dfs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to
再運(yùn)行一次,這次使用更多的選項(xiàng):
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
應(yīng)該得到這樣的輸出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1
再運(yùn)行一次,這一次關(guān)閉大小寫敏感性(case-sensitivity):
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
輸出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2
程序要點(diǎn)
通過使用一些Map/Reduce框架提供的功能,WordCount的第二個(gè)版本在原始版本基礎(chǔ)上有了如下的改進(jìn):
展示了應(yīng)用程序如何在Mapper (和Reducer)中通過configure方法 修改配置參數(shù)(28-43行)。
展示了作業(yè)如何使用DistributedCache 來分發(fā)只讀數(shù)據(jù)。 這里允許用戶指定單詞的模式,在計(jì)數(shù)時(shí)忽略那些符合模式的單詞(104行)。
展示Tool接口和GenericOptionsParser處理Hadoop命令行選項(xiàng)的功能 (87-116, 119行)。
展示了應(yīng)用程序如何使用Counters(68行),如何通過傳遞給map(和reduce) 方法的Reporter實(shí)例來設(shè)置應(yīng)用程序的狀態(tài)信息(72行)。
Java和JNI是Sun Microsystems, Inc.在美國(guó)和其它國(guó)家的注冊(cè)商標(biāo)。
本文來自CSDN博客,轉(zhuǎn)載請(qǐng)標(biāo)明出處:http://blog.csdn.net/superxgl/archive/2010/01/11/5171929.aspx