`
刘小小尘
  • 浏览: 62687 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

sequencefile处理小文件实例

 
阅读更多

今天没事,写了下sequencefile处理小文件实例

废话不说,直接上代码


WholeFileRecordReader:

package com.pzoom.mr.sequence;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
	private FileSplit fileSplit ;
	private Configuration conf ;
	private boolean processed = false ;
	private BytesWritable value = new BytesWritable() ;
	@Override
	public void close() throws IOException {
	}
	@Override
	public NullWritable getCurrentKey() throws IOException,
			InterruptedException {
		return NullWritable.get();
	}
	@Override
	public BytesWritable getCurrentValue() throws IOException,
			InterruptedException {
		return value;
	}
	@Override
	public float getProgress() throws IOException, InterruptedException {
		return processed ? 1.0f : 0.0f;
	}
	@Override
	public void initialize(InputSplit inputsplit,
			TaskAttemptContext taskattemptcontext) throws IOException,
			InterruptedException {
		this.fileSplit = (FileSplit) inputsplit ;
		this.conf = taskattemptcontext.getConfiguration();
	}
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (!processed) {
			byte[]  contents = new byte[(int)fileSplit.getLength()] ;
			Path file = fileSplit.getPath();
			FileSystem fs = file.getFileSystem(conf);
			FSDataInputStream in = null ;
			
			in = fs.open(file);
			IOUtils.readFully(in, contents, 0, contents.length);
			value.set(contents, 0, contents.length);
			IOUtils.closeStream(in);
			processed = true ;
			return true ;
		}
		return false;
	}



}


WholeFileInputFormat:只需要重载createRecordReader()和isSplitable()方法就行,代码如下:

package com.pzoom.mr.sequence;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
	
	
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false ;
	}

	@Override
	public RecordReader<NullWritable, BytesWritable> createRecordReader(
			InputSplit inputsplit, TaskAttemptContext taskattemptcontext)
			throws IOException, InterruptedException {
		WholeFileRecordReader reader = new WholeFileRecordReader();
		reader.initialize(inputsplit, taskattemptcontext);
		return reader;
	}
	
}

测试类:SmallFilesToSequenceFileConverter

package com.pzoom.mr.sequence;

import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SmallFilesToSequenceFileConverter {
	static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
		private Text filenameKey;
		
		@Override
		protected void setup(Context context) throws IOException,
		InterruptedException {
			InputSplit split = context.getInputSplit();
			Path path = ((FileSplit)split).getPath();
			filenameKey = new Text(path.toString());
		}

		@Override
		protected void map(NullWritable key, BytesWritable value,
				Context context) throws IOException, InterruptedException {
			context.write(filenameKey, value);
		}

	}
	
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration() ;
		Job job = new Job(conf,"fdaf");
		
		FileInputFormat.setInputPaths(job, new Path("D:/keywordzip-zip/testNull")) ;
		FileOutputFormat.setOutputPath(job, new Path("D:/mapreduce-out/1AA" + new Random().nextInt(100))) ;
		job.setJarByClass(SmallFilesToSequenceFileConverter.class);
		job.setInputFormatClass(WholeFileInputFormat.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(BytesWritable.class);
		job.setMapperClass(SequenceFileMapper.class);
		
		job.waitForCompletion(true);
	}
}







以上代码可以正确运行,输出二进制文件

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics