`

一个经典的MapReduce模板代码,倒排索引(ReverseIndex)

阅读更多
问题描述:统计某个号码被哪些号码呼叫了

输入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789    112
15699807656 110
011-678987 112
说明:每一行为一条电话通话记录,左边的号码(记为a)打给右边的号码(记为b号码),中间用空格隔开

要求:
将以上文件以如下格式输出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
说明:左边为被呼叫的号码b,右边为呼叫b的号码a以"|"分割


解决思想很简单:Map中将a,b号码分割key为b,value为a写入context
Reduce中将values以"|"迭代分割

不多说(十二点了....),下面代码可在装有mapreduce插件的Eclipse中执行,也可先编译class文件,在打成jar包运行(代码中有提示,不过这对于初学者有些困难)不会的同学可以在下面评论,或加我QQ:1106373297,备注:hadoop学习

很经典的一段代码,哈哈
控制台输出结果:
........
/07/09 08:42:04 INFO mapred.JobClient:     Combine input records=0
14/07/09 08:42:04 INFO mapred.JobClient:     Reduce input records=5
14/07/09 08:42:04 INFO mapred.JobClient:     Reduce input groups=3
14/07/09 08:42:04 INFO mapred.JobClient:     Combine output records=0
14/07/09 08:42:04 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
14/07/09 08:42:04 INFO mapred.JobClient:     Reduce output records=5
14/07/09 08:42:04 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
14/07/09 08:42:04 INFO mapred.JobClient:     Map output records=5
任务名称:ReverseIndex
任务成功:是
输入行数:6
输出行数:5
跳过的行:1
任务开始:2014-07-09 08:41:55
任务结束:2014-07-09 08:42:04
任务耗时:0.15313333 分钟


import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class ReverseIndex extends Configured implements Tool {


enum Counter {
LINESKIP, // 出错的行
}


public static class Map extends Mapper<LongWritable,Text,Text,Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString(); // 读取源数据
try{
// 数据处理
String[] lineSplit = line.split(" ");
String anum = lineSplit[0];
String bnum = lineSplit[1];
context.write(new Text(bnum), new Text(anum)); // 输出

} catch (java.lang.ArrayIndexOutOfBoundsException e) {
context.getCounter(Counter.LINESKIP).increment(1); // 出错hang计数器+1
return;
}
}
}


public static class Reduce extends Reducer {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String valueString;
String out = "";
for (Text value : values) {
valueString = value.toString();
out += valueString + "|";
System.out.println("Ruduce:key="+key+"  value="+value);
}
context.write(key, new Text(out));
}
}


@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();


Job job = new Job(conf, "ReverseIndex"); // 任务名
job.setJarByClass(ReverseIndex.class); // 指定Class

FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径

job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码
job.setReducerClass(ReverseIndex.Reduce.class); // 调用上面Reduce类作为Reduce任务代码

job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式
job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式

job.waitForCompletion(true);

// 输出任务完成情况
System.out.println("任务名称:" + job.getJobName());
System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否"));
System.out.println("输入行数:"
+ job.getCounters()
.findCounter("org.apache.hadoop.mapred.Task$Counter",
"MAP_INPUT_RECORDS").getValue());
System.out.println("输出行数:"
+ job.getCounters()
.findCounter("org.apache.hadoop.mapred.Task$Counter",
"MAP_OUTPUT_RECORDS").getValue());
System.out.println("跳过的行:"
+ job.getCounters().findCounter(Counter.LINESKIP).getValue());


return job.isSuccessful() ? 0 : 1;
}


public static void main(String[] args) throws Exception {
// 判断参数个数是否正确
// 如果无参数运行则显示以作程序说明
if (args.length != 2) {
System.err.println("");
System.err.println("Usage: ReverseIndex < input path > < output path > ");
System.err
.println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out");


System.exit(-1);
}
// 记录开始时间
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date start = new Date();
// 运行任务
int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args);


// 输出任务耗时
Date end = new Date();
float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
System.out.println("任务开始:" + formatter.format(start));
System.out.println("任务结束:" + formatter.format(end));
System.out.println("任务耗时:" + String.valueOf(time) + " 分钟");


System.exit(res);
}
}
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics