• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            面對(duì)現(xiàn)實(shí),超越自己
            逆水行舟,不進(jìn)則退
            posts - 269,comments - 32,trackbacks - 0
            1、NameNode啟動(dòng)加載元數(shù)據(jù)情景分析
            • NameNode函數(shù)里調(diào)用FSNamesystemm讀取dfs.namenode.name.dir和dfs.namenode.edits.dir構(gòu)建FSDirectory。
            • FSImage類recoverTransitionRead和saveNameSpace分別實(shí)現(xiàn)了元數(shù)據(jù)的檢查、加載、內(nèi)存合并和元數(shù)據(jù)的持久化存儲(chǔ)。
            • saveNameSpace將元數(shù)據(jù)寫入到磁盤,具體操作步驟:首先將current目錄重命名為lastcheckpoint.tmp;然后在創(chuàng)建新的current目錄,并保存文件;最后將lastcheckpoint.tmp重命名為privios.checkpoint.
            • checkPoint的過程:Secondary NameNode會(huì)通知nameNode產(chǎn)生一個(gè)edit log文件edits.new,之后所有的日志操作寫入到edits.new文件中。接下來Secondary NameNode會(huì)從namenode下載fsimage和edits文件,進(jìn)行合并產(chǎn)生新的fsimage.ckpt;然后Secondary會(huì)將fsimage.ckpt文件上傳到namenode。最后namenode會(huì)重命名fsimage.ckpt為fsimage,edtis.new為edits;
            2、元數(shù)據(jù)更新及日志寫入情景分析
            以mkdir為例:
            logSync代碼分析:
            代碼:
            1. public void logSync () throws IOException {  
            2. ArrayList<EditLogOutputStream > errorStreams = null ;  
            3. long syncStart = 0;  
            4.   
            5. // Fetch the transactionId of this thread.  
            6. long mytxid = myTransactionId .get (). txid;  
            7. EditLogOutputStream streams[] = null;  
            8. boolean sync = false;  
            9. try {  
            10. synchronized (this) {  
            11. assert editStreams. size() > 0 : "no editlog streams" ;  
            12. printStatistics (false);  
            13. // if somebody is already syncing, then wait  
            14. while (mytxid > synctxid && isSyncRunning) {  
            15. try {  
            16. wait (1000 );  
            17. catch (InterruptedException ie ) {  
            18. }  
            19. }  
            20. //  
            21. // If this transaction was already flushed, then nothing to do  
            22. //  
            23. if (mytxid <= synctxid ) {  
            24. numTransactionsBatchedInSync ++;  
            25. if (metrics != null// Metrics is non-null only when used inside name node  
            26. metrics .transactionsBatchedInSync .inc ();  
            27. return;  
            28. }  
            29. // now, this thread will do the sync  
            30. syncStart = txid ;  
            31. isSyncRunning = true;  
            32. sync = true;  
            33. // swap buffers  
            34. for( EditLogOutputStream eStream : editStreams ) {  
            35. eStream .setReadyToFlush ();  
            36. }  
            37. streams =  
            38. editStreams .toArray (new EditLogOutputStream[editStreams. size()]) ;  
            39. }  
            40. // do the sync  
            41. long start = FSNamesystem.now();  
            42. for (int idx = 0; idx < streams. length; idx++ ) {  
            43. EditLogOutputStream eStream = streams [idx ];  
            44. try {  
            45. eStream .flush ();  
            46. catch (IOException ie ) {  
            47. FSNamesystem .LOG .error ("Unable to sync edit log." , ie );  
            48. //  
            49. // remember the streams that encountered an error.  
            50. //  
            51. if (errorStreams == null) {  
            52. errorStreams = new ArrayList <EditLogOutputStream >( 1) ;  
            53. }  
            54. errorStreams .add (eStream );  
            55. }  
            56. }  
            57. long elapsed = FSNamesystem.now() - start ;  
            58. processIOError (errorStreams , true);  
            59. if (metrics != null// Metrics non-null only when used inside name node  
            60. metrics .syncs .inc (elapsed );  
            61. finally {  
            62. synchronized (this) {  
            63. synctxid = syncStart ;  
            64. if (sync ) {  
            65. isSyncRunning = false;  
            66. }  
            67. this.notifyAll ();  
            68. }  
            69. }  
            70. }  

            3、Backup Node 的checkpoint的過程分析:
            1. /** 
            2. * Create a new checkpoint 
            3. */  
            4. void doCheckpoint() throws IOException {  
            5. long startTime = FSNamesystem.now ();  
            6. NamenodeCommand cmd =  
            7. getNamenode().startCheckpoint( backupNode. getRegistration());  
            8. CheckpointCommand cpCmd = null;  
            9. switch( cmd. getAction()) {  
            10. case NamenodeProtocol .ACT_SHUTDOWN :  
            11. shutdown() ;  
            12. throw new IOException ("Name-node " + backupNode .nnRpcAddress  
            13. " requested shutdown.");  
            14. case NamenodeProtocol .ACT_CHECKPOINT :  
            15. cpCmd = (CheckpointCommand )cmd ;  
            16. break;  
            17. default:  
            18. throw new IOException ("Unsupported NamenodeCommand: "+cmd.getAction()) ;  
            19. }  
            20.   
            21. CheckpointSignature sig = cpCmd. getSignature();  
            22. assert FSConstants.LAYOUT_VERSION == sig .getLayoutVersion () :  
            23. "Signature should have current layout version. Expected: "  
            24. + FSConstants.LAYOUT_VERSION + " actual " + sig. getLayoutVersion();  
            25. assert !backupNode .isRole (NamenodeRole .CHECKPOINT ) ||  
            26. cpCmd. isImageObsolete() : "checkpoint node should always download image.";  
            27. backupNode. setCheckpointState(CheckpointStates .UPLOAD_START );  
            28. if( cpCmd. isImageObsolete()) {  
            29. // First reset storage on disk and memory state  
            30. backupNode. resetNamespace();  
            31. downloadCheckpoint(sig);  
            32. }  
            33.   
            34. BackupStorage bnImage = getFSImage() ;  
            35. bnImage. loadCheckpoint(sig);  
            36. sig.validateStorageInfo( bnImage) ;  
            37. bnImage. saveCheckpoint();  
            38.   
            39. if( cpCmd. needToReturnImage())  
            40. uploadCheckpoint(sig);  
            41.   
            42. getNamenode() .endCheckpoint (backupNode .getRegistration (), sig );  
            43.   
            44. bnImage. convergeJournalSpool();  
            45. backupNode. setRegistration(); // keep registration up to date  
            46. if( backupNode. isRole( NamenodeRole.CHECKPOINT ))  
            47. getFSImage() .getEditLog (). close() ;  
            48. LOG. info( "Checkpoint completed in "  
            49. + (FSNamesystem .now() - startTime )/ 1000 + " seconds."  
            50. " New Image Size: " + bnImage .getFsImageName (). length()) ;  
            51. }  
            52. }  

            4、元數(shù)據(jù)可靠性機(jī)制。
            • 配置多個(gè)備份路徑。NameNode在更新日志或進(jìn)行Checkpoint的過程,會(huì)將元數(shù)據(jù)放在多個(gè)目錄下。
            • 對(duì)于沒一個(gè)需要保存的元數(shù)據(jù)文件,都創(chuàng)建一個(gè)輸出流,對(duì)訪問過程中出現(xiàn)的異常輸出流進(jìn)行處理,將其移除。并再合適的時(shí)機(jī)再次檢查移除的數(shù)據(jù)量是否恢復(fù)正常。有效的保證了備份輸出流的異常問題。
            • 采用了多種機(jī)制來保證元數(shù)據(jù)的可靠性。例如在checkpoint的過程中,分為幾個(gè)階段,通過不同的文件名來標(biāo)識(shí)當(dāng)前所處的狀態(tài)。為存儲(chǔ)失敗后進(jìn)行恢復(fù)提供了可能。
            5、元數(shù)據(jù)的一致性機(jī)制。
            • 首先從NameNode啟動(dòng)時(shí),對(duì)每個(gè)備份目錄是否格式化、目錄元數(shù)據(jù)文件名是否正確等進(jìn)行檢查,確保元數(shù)據(jù)文件間的狀態(tài)一致性,然后選取最新的加載到內(nèi)存,這樣可以確保HDFS當(dāng)前狀態(tài)和最后一次關(guān)閉時(shí)的狀態(tài)一致性。
            • 其次,通過異常輸出流的處理,可以確保正常輸出流數(shù)據(jù)的一致性。
            • 運(yùn)用同步機(jī)制,確保了輸出流一致性問題。

              本文轉(zhuǎn)自:
              http://blog.csdn.net/kntao/article/details/7770597
            posted on 2013-05-24 15:29 王海光 閱讀(479) 評(píng)論(0)  編輯 收藏 引用 所屬分類: Linux
            久久这里只精品99re66| 久久无码人妻一区二区三区| 99久久亚洲综合精品成人| 青青青国产精品国产精品久久久久| 久久九九精品99国产精品| 国产精品久久久天天影视香蕉 | 久久精品无码免费不卡| 亚洲精品无码久久久| 久久久噜噜噜久久熟女AA片| 亚洲综合精品香蕉久久网97| 中文字幕久久久久人妻| 亚洲国产精品久久久久婷婷老年| 亚洲国产综合久久天堂 | 久久精品国产精品青草app| 88久久精品无码一区二区毛片 | 91亚洲国产成人久久精品网址| 久久精品中文字幕第23页| 九九精品久久久久久噜噜| 久久青青草原国产精品免费| 一级A毛片免费观看久久精品| 亚洲国产精品久久久久久| 欧美午夜精品久久久久久浪潮| 亚洲国产精品综合久久一线| 亚洲国产成人久久精品99| 国产Av激情久久无码天堂| 伊人久久大香线蕉av不卡| 久久香蕉国产线看观看猫咪?v| 午夜不卡888久久| 精品久久久久久亚洲| 色妞色综合久久夜夜| 亚洲国产精品无码久久| 亚洲精品国产自在久久| 久久久久18| 久久精品国产99久久无毒不卡| 久久免费精品一区二区| 久久久久女人精品毛片| 亚洲色婷婷综合久久| 亚洲日韩中文无码久久| 五月丁香综合激情六月久久| 亚洲v国产v天堂a无码久久| 婷婷久久综合九色综合98|