轉自:http://my.oschina.net/tuzibuluo/blog?catalog=127826
1.Writable接口 Hadoop 并沒有使用 JAVA 的序列化,而是引入了自己實的序列化系統, package org.apache.hadoop.io 這個包中定義了大量的可序列化對象,這些對象都實現了 Writable 接口, Writable 接口是序列化對象的一個通用接口.我們來看下Writable 接口的定義。
public interface Writable{
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
Writable接口抽象了兩個序列化的方法Write和ReadFields,分別對應了序列化和反序列化,參數DataOutPut 為java.io包內的IO類,Writable接口只是對象序列化的一個簡單聲明。
2.WriteCompareable接口 WriteCompareable接口是Wirtable接口的二次封裝,提供了compareTo(T o)方法,用于序列化對象的比較的比較。因為mapreduce中間有個基于key的排序階段。
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
下面是io包簡單的類圖關系。
3.
RawComparator接口 hadoop為序列化提供了優化,類型的比較對M/R而言至關重要,Key和Key的比較也是在排序階段完成的,hadoop提供了原生的比較器接口RawComparator<T>用于序列化字節間的比較,該接口允許其實現直接比較數據流中的記錄,無需反序列化為對象,RawComparator是一個原生的優化接口類,它只是簡單的提供了用于數據流中簡單的數據對比方法,從而提供優化:public interface RawComparator<T> extends Comparator<T> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
該接口并非被多數的衍生類所實現,其直接的子類為WritableComparator,多數情況下是作為實現Writable接口的類的內置類,提供序列化字節的比較。下面是RawComparator接口內置類的實現類圖:
首先,我們看 RawComparator的直接實現類WritableComparator:
WritableComparator類似于一個注冊表,里面通過靜態map記錄了所有WritableComparator類的集合。Comparators成員用一張Hash表記錄Key=Class,value=WritableComprator的注冊信息.
WritableComparator主要提供了兩個功能
1. 提供了對原始compare()方法的一個默認實現
默認實現是 先反序列化為對像 再通過 對像比較(有開銷的問題),所以一般都會被具體writeCompatable類的Comparator類覆蓋以加快效率。
public
int
compare(byte
[] b1, int
s1, int
l1, byte
[] b2, int
s2, int
l2) {
try
{
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
} catch
(IOException e) {
throw
new
RuntimeException(e);
}
return
compare(key1, key2); // compare them
}2. 充當RawComparable實例的工廠,以注冊Writable的實現
例如,為了獲取IntWritable的Comparator,可以直接調用其get方法。
4.WritableComparator類
接下來撿關鍵代碼來分析writableComparator類,該類是RawComparator接口的直接子類。
代碼1:registry 注冊器
// registry 注冊器:記載了WritableComparator類的集合
private static HashMap<Class, WritableComparator>comparators = new HashMap<Class, WritableComparator>();
----------------------------------------------------------------
代碼2:獲取WritableComparator實例
說明:hashMap作為容器類線程不安全,故需要synchronized同步,get方法根據key=Class返回對應的WritableComparator,若返回的是空值NUll,則調用protected Constructor進行構造,而其兩個protected的構造函數實則是調用了newKey()方法進行NewInstance
public static synchronized WritableComparator get(Class<? extends WritableComparable> c) {
WritableComparator comparator = comparators.get(c);
if (comparator == null)
comparator = new WritableComparator(c, true);
return comparator;
}
----------------------------------------------------------------
代碼3:WritableComparator構造方法
new WritableComparator(c, true)
WritableComparator的構造函數源碼如下:
/*
* keyClass,key1,key2和buffer都是用于WritableComparator的構造函數
*/
private final Class<? extends WritableComparable> keyClass;
private final WritableComparable key1; //WritableComparable接口
private final WritableComparable key2;
private final DataInputBuffer buffer; //輸入緩沖流
protected WritableComparator(Class<? extends WritableComparable> keyClass,boolean createInstances) {
this.keyClass = keyClass;
if (createInstances) {
key1 = newKey();
key2 = newKey();
buffer = new DataInputBuffer();
} else {
key1 = key2 = null;
buffer = null;
}
}
上述的keyClass,key1,key2,buffer是記錄HashMap對應的key值,用于WritableComparator的構造函數,但由其構造函數中我們可以看出WritableComparator根據Boolean createInstance來判斷是否實例化key1,key2和buffer,而key1,key2是用于接收比較的兩個key。在WritableComparator的構造函數里面通過newKey()的方法去實例化實現WritableComparable接口的一個對象,下面是newKey()的源碼,通過hadoop自身的反射去實例化了一個WritableComparable接口對象。
public WritableComparable newKey() {
return ReflectionUtils.newInstance(keyClass, null);
}
----------------------------------------------------------------
代碼4:Compare()方法
(1). public int compare(Object a, Object b);
(2). public int compare(WritableComparable a, WritableComparable b);
(3). public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
三個compare()重載方法中,compare(Object a, Object b)利用子類塑形為WritableComparable而調用了第2個compare方法,而第2個Compare()方法則調用了Writable.compaerTo();最后一個compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法源碼如下:public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
return compare(key1, key2); // compare them
}
Compare方法的一個缺省實現方式,根據接口key1,ke2反序列化為對象再進行比較。
利用Buffer為橋接中介,把字節數組存儲為buffer后,調用key1(WritableComparable)的反序列化方法,再來比較key1,ke2,由此處可以看出,該compare方法是將要比較的二進制流反序列化為對象,再調用方法第2個重載方法進行比較。
----------------------------------------------------------------代碼5:方法define方法
該方法用于注冊WritebaleComparaor對象到注冊表中,注意同時該方法也需要同步,代碼如下:
public static synchronized void define(Class c, WritableComparator comparator) {
comparators.put(c, comparator);
}
----------------------------------------------------------------代碼6:余下諸如readInt的靜態方法 這些方法用于實現WritableComparable的各種實例,例如 IntWritable實例:內部類Comparator類需要根據自己的IntWritable類型重載WritableComparator里面的compare()方法,可以說WritableComparator里面的compare()方法只是提供了一個缺省的實現,而真正的compare()方法實現需要根據自己的類型如IntWritable進行重載,所以WritableComparator方法中的那些readInt..等方法只是底層的封裝的一個實現,方便內部Comparator進行調用而已。
下面我們著重看下BooleanWritable類的內置RawCompartor<T>的實現過程:
public static class Comparator extends WritableComparator {
public Comparator() {//調用父類的Constructor初始化keyClass=BooleanWrite.class
super(BooleanWritable.class);
}
//重寫父類的序列化比較方法,用些類用到父類提供的缺省方法
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
boolean a = (readInt(b1, s1) == 1) ? true : false;
boolean b = (readInt(b2, s2) == 1) ? true : false;
return ((a == b) ? 0 : (a == false) ? -1 : 1);
}
}
//注冊
static {
WritableComparator.define(BooleanWritable.class, new Comparator());
}
總結:
hadoop 類似于Java的類包,即提供了Comparable接口(對應于writableComparable接口)和Comparator類(對應于RawComparator類)用于實現序列化的比較,在hadoop 的IO包中已經封裝了JAVA的基本數據類型用于序列化和反序列化,一般自己寫的類實現序列化和反序列化需要繼承WritableComparable接口并且內置一個Comparator(繼承于WritableComparator)的格式來實現自己的對象。
5.WritableFactory接口 作為工廠模式的WritableFactory,其抽象為一個接口,提供了具體的Writable對象創建實例的抽象方法newInstance(),代碼如下: public interface WritableFactory {
/** Return a new instance. */
Writable newInstance();
}
WritableFactories類類似于WritableComparator類利用HashMap注冊記錄著所有實現上述接口的WritableFactory的集合,與之不同的是WritableFactories是一個單例模式,所有的方法都是靜態的。關鍵代碼://提供了一個key=class,value=WritableFactory的注冊表
private static final HashMap<Class, WritableFactory> CLASS_TO_FACTORY = new HashMap<Class, WritableFactory>();
public static Writable newInstance(Class<? extends Writable> c, Configuration conf) {
WritableFactory factory = WritableFactories.getFactory(c);
if (factory != null) {
//該方法的newInstanceof是調用了factory.newInstance()即你了實現的WritableFactory的newInstance()方法
Writable result = factory.newInstance();
if (result instanceof Configurable) {
((Configurable) result).setConf(conf);
}
return result;
} else {
return ReflectionUtils.newInstance(c, conf);
}
}
6.InputBuffer和DataInputBuffer類
類似于JAVA.IO 的裝飾器模式, InputBuffer輸入緩沖和DataInputBuffer數據緩沖的實現封裝于內部類Buffer,該類的功能只是提供一個空的緩沖區,用于存儲數據。Buffer代碼如下:
private static class Buffer extends ByteArrayInputStream {
public Buffer() {
super(new byte[] {});
}
public void reset(byte[] input, int start, int length) {
this.buf = input;
this.count = start+length;
this.mark = start;
this.pos = start;
}
public int getPosition() { return pos; }
public int getLength() { return count; }
}
InputBuffer和DataInputBuffer的方法委托于內部類private Buffer buffer,例如InputBuffer部分代碼:
/** Returns the current position in the input. */
public int getPosition() { return buffer.getPosition(); }
/** Returns the length of the input. */
public int getLength() { return buffer.getLength(); }
DataInputBuffer 內置的Buffer代碼如下
private static class Buffer extends ByteArrayInputStream {
public Buffer() {
super(new byte[] {});
}
public void reset(byte[] input, int start, int length) {
this.buf = input;
this.count = start+length;
this.mark = start;
this.pos = start;
}
public byte[] getData() { return buf; }
public int getPosition() { return pos; }
public int getLength() { return count; }
}
兩個類封裝的Buffer一樣,而其方法也都委托依賴于buffer,只是InputBuffer和DataInputBuffer繼承于不同的類,如下:
DataInputBuffer:
public class DataInputBuffer extends DataInputStream {
}
InputBuffer:
public class InputBuffer extends FilterInputStream {
}
7.OutputBuffer和DataOutputBuffe
類似于上文的InputBuffer和DataInputBuffer,hadoop 的OutputBuffer和DataOutputBuffer的實現與之相似,同樣是利用內部類的引用,而關鍵的代碼在于內部類Buffer:
private static class Buffer extends ByteArrayOutputStream {
public byte[] getData() { return buf; }
public int getLength() { return count; }
public void reset() { count = 0; }
public void write(InputStream in, int len) throws IOException {
int newcount = count + len;
if (newcount > buf.length) {
byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
System.arraycopy(buf, 0, newbuf, 0, count);
buf = newbuf;
}
IOUtils.readFully(in, buf, count, len);
count = newcount;
}
}
先是判斷buf數組的length,倘若空間不足,則new newbuf[] 利用Sysytem的數組拷貝實現內容的復制。