`
刘小小尘
  • 浏览: 62467 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Hadoop MapReduce中如何处理跨行Block和UnputSplit

 
阅读更多

Hadoop的初学者经常会疑惑这样两个问题:1.Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?2.在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,如果被分成两个InputSplit,这样一个InputSplit里面就有一行不完整的数据,那么处理这个InputSplit的Mapper会不会得出不正确的结果?

对于上面的两个问题,首先要明确两个概念:Block和InputSplit

1. block是hdfs存储文件的单位(默认是64M);
2.InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。

因此,以行记录形式的文本,还真可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。

Java代码收藏代码
  1. publicList<InputSplit>getSplits(JobContextjob)throwsIOException{
  2. longminSize=Math.max(getFormatMinSplitSize(),getMinSplitSize(job));
  3. longmaxSize=getMaxSplitSize(job);
  4. //generatesplits
  5. List<InputSplit>splits=newArrayList<InputSplit>();
  6. List<FileStatus>files=listStatus(job);
  7. for(FileStatusfile:files){
  8. Pathpath=file.getPath();
  9. longlength=file.getLen();
  10. if(length!=0){
  11. FileSystemfs=path.getFileSystem(job.getConfiguration());
  12. BlockLocation[]blkLocations=fs.getFileBlockLocations(file,0,length);
  13. if(isSplitable(job,path)){
  14. longblockSize=file.getBlockSize();
  15. longsplitSize=computeSplitSize(blockSize,minSize,maxSize);
  16. longbytesRemaining=length;
  17. while(((double)bytesRemaining)/splitSize>SPLIT_SLOP){
  18. intblkIndex=getBlockIndex(blkLocations,length-bytesRemaining);
  19. splits.add(makeSplit(path,length-bytesRemaining,splitSize,
  20. blkLocations[blkIndex].getHosts()));
  21. bytesRemaining-=splitSize;
  22. }
  23. if(bytesRemaining!=0){
  24. splits.add(makeSplit(path,length-bytesRemaining,bytesRemaining,
  25. blkLocations[blkLocations.length-1].getHosts()));
  26. }
  27. }else{//notsplitable
  28. splits.add(makeSplit(path,0,length,blkLocations[0].getHosts()));
  29. }
  30. }else{
  31. //Createemptyhostsarrayforzerolengthfiles
  32. splits.add(makeSplit(path,0,length,newString[0]));
  33. }
  34. }
  35. //Savethenumberofinputfilesformetrics/loadgen
  36. job.getConfiguration().setLong(NUM_INPUT_FILES,files.size());
  37. LOG.debug("Total#ofsplits:"+splits.size());
  38. returnsplits;
  39. }

从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize

对文件进行切分,splitSize = computeSplitSize(blockSize, minSize, maxSize);blockSize,minSize,maxSize都可以配置,默认splitSize 就等于blockSize的默认值(64m)。

FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,其可能被切分到不同的InputSplit。但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成,在Hadoop里,记录行形式的文本,通常采用默认的TextInputFormat,TextInputFormat关联的是LineRecordReader,下面我们来看看LineRecordReader的的nextKeyValue方法里读取文件的代码:

Java代码收藏代码
  1. while(getFilePosition()<=end){
  2. newSize=in.readLine(value,maxLineLength,
  3. Math.max(maxBytesToConsume(pos),maxLineLength));
  4. if(newSize==0){
  5. break;
  6. }

其读取文件是通过LineReader(in就是一个LineReader实例)的readLine方法完成的:

Java代码收藏代码
  1. publicintreadLine(Textstr,intmaxLineLength,
  2. intmaxBytesToConsume)throwsIOException{
  3. if(this.recordDelimiterBytes!=null){
  4. returnreadCustomLine(str,maxLineLength,maxBytesToConsume);
  5. }else{
  6. returnreadDefaultLine(str,maxLineLength,maxBytesToConsume);
  7. }
  8. }
  9. /**
  10. *ReadalineterminatedbyoneofCR,LF,orCRLF.
  11. */
  12. privateintreadDefaultLine(Textstr,intmaxLineLength,intmaxBytesToConsume)
  13. throwsIOException{
  14. str.clear();
  15. inttxtLength=0;//tracksstr.getLength(),asanoptimization
  16. intnewlineLength=0;//lengthofterminatingnewline
  17. booleanprevCharCR=false;//trueofprevcharwasCR
  18. longbytesConsumed=0;
  19. do{
  20. intstartPosn=bufferPosn;//startingfromwhereweleftoffthelasttime
  21. if(bufferPosn>=bufferLength){
  22. startPosn=bufferPosn=0;
  23. if(prevCharCR)
  24. ++bytesConsumed;//accountforCRfrompreviousread
  25. bufferLength=in.read(buffer);
  26. if(bufferLength<=0)
  27. break;//EOF
  28. }
  29. for(;bufferPosn<bufferLength;++bufferPosn){//searchfornewline
  30. if(buffer[bufferPosn]==LF){
  31. newlineLength=(prevCharCR)?2:1;
  32. ++bufferPosn;//atnextinvocationproceedfromfollowingbyte
  33. break;
  34. }
  35. if(prevCharCR){//CR+notLF,weareatnotLF
  36. newlineLength=1;
  37. break;
  38. }
  39. prevCharCR=(buffer[bufferPosn]==CR);
  40. }
  41. intreadLength=bufferPosn-startPosn;
  42. if(prevCharCR&&newlineLength==0)
  43. --readLength;//CRattheendofthebuffer
  44. bytesConsumed+=readLength;
  45. intappendLength=readLength-newlineLength;
  46. if(appendLength>maxLineLength-txtLength){
  47. appendLength=maxLineLength-txtLength;
  48. }
  49. if(appendLength>0){
  50. str.append(buffer,startPosn,appendLength);
  51. txtLength+=appendLength;
  52. }
  53. }while(newlineLength==0&&bytesConsumed<maxBytesToConsume);<spanstyle="color:#ff0000;">//①</span>
  54. if(bytesConsumed>(long)Integer.MAX_VALUE)
  55. thrownewIOException("Toomanybytesbeforenewline:"+bytesConsumed);
  56. return(int)bytesConsumed;
  57. }

我们分析下readDefaultLine方法,do-while循环体主要是读取文件,然后遍历读取的内容,找到默认的换行符就终止循环。前面说,对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取。这就体现在上述代码的While循环的终止条件上:

while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

newlineLength==0则以为一次do-while循环中读取的内容中没有遇到换行符,因maxBytesToConsume的默认值为Integer.MAX_VALUE,所以如果读取的内容没有遇到换行符,则会一直读取下去,知道读取的内容超过maxBytesToConsume。这样的出来方式,解决了一行记录跨InputSplit的读取问题,同样也会造成下面两个疑问:

1.既然在LineReader读取方法里面没有对考虑InputSplit的end进行处理,难道读取一个InputSplit的时候,会这样无限的读取下去么?

2.如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如果做到不读取L这条记录在B中的部分呢?

为了解决这两个问题,Hadoop通过下面的代码来做到:LineRecordReader的nextKeyValue方法。

Java代码收藏代码
  1. publicbooleannextKeyValue()throwsIOException{
  2. if(key==null){
  3. key=newLongWritable();
  4. }
  5. key.set(pos);
  6. if(value==null){
  7. value=newText();
  8. }
  9. intnewSize=0;
  10. //Wealwaysreadoneextraline,whichliesoutsidetheupper
  11. //splitlimiti.e.(end-1)
  12. while(getFilePosition()<=end){<spanstyle="color:#ff0000;">//②</span>
  13. newSize=in.readLine(value,maxLineLength,
  14. Math.max(maxBytesToConsume(pos),maxLineLength));
  15. if(newSize==0){
  16. break;
  17. }
  18. pos+=newSize;
  19. inputByteCounter.increment(newSize);
  20. if(newSize<maxLineLength){
  21. break;
  22. }
  23. //linetoolong.tryagain
  24. LOG.info("Skippedlineofsize"+newSize+"atpos"+
  25. (pos-newSize));
  26. }
  27. if(newSize==0){
  28. key=null;
  29. value=null;
  30. returnfalse;
  31. }else{
  32. returntrue;
  33. }
  34. }

通过代码②处得While条件,就保证了InputSplit读取边界的问题,如果存在跨InputSplit的记录,也只好跨InputSplit读取一次。

再来看LineRecordReader的initialize方法:

Java代码收藏代码
  1. //Ifthisisnotthefirstsplit,wealwaysthrowawayfirstrecord
  2. //becausewealways(exceptthelastsplit)readoneextralinein
  3. //next()method.
  4. if(start!=0){
  5. start+=in.readLine(newText(),0,maxBytesToConsume(start));
  6. }
  7. this.pos=start;

如果不是第一InputSplit,则在读取的时候,LineRecordReader会自动忽略掉第一个换行符之前的所有内容,这样就不存在重读读取的问题。

此次,前面提到的两个问题就回到完了。。。。。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics