Hadoop之MapReduce编程

0x00 前言

MapReduce是Google提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。概念“Map(映射”和“Reduce(归纳)”,及他们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。
关于其原理,网上有很多资料,这里不细讲。
笔者也仅是入门,此篇仅贴出入门的几个编程例子。

0x01 代码

给出的例子使用java api,利用IDEA远程连接。
代码实现了几个简单的map 及reduce例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package hadoop;
import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Map_reduce {
//统计词频
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
//统计大小写字母、数字、其他频率
public static class ZimuMapper extends Mapper<Object, Text, Text, IntWritable>
{

//private final static IntWritable one = new IntWritable(1);
//private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String str = value.toString();
for(int i=0;i<str.length();i++){
char ch = str.charAt(i);
if(ch>='a'&&ch<='z')
context.write(new Text("Lowwer"),new IntWritable(1));
else if (ch>='A'&&ch<='Z'){
context.write(new Text("Upper"),new IntWritable(1));
}
else if(ch>='0'&&ch<='9')
{
context.write(new Text("Nummer"),new IntWritable(1));
}
else{
context.write(new Text("Else"),new IntWritable(1));
}
}
}
}
//统计每个字母个数
public static class geshuMapper extends Mapper<Object, Text, Text, IntWritable>
{

//private final static IntWritable one = new IntWritable(1);
//private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String str = value.toString();
for(int i=0;i<str.length();i++){
char ch = str.charAt(i);
context.write(new Text(String.valueOf(ch)),new IntWritable(1));

}
}
}

//最长单词
public static class MaxworldMapper extends Mapper<Object, Text, IntWritable, Text>
{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(one, word);
}

}
}
// 最长词频reduce
public static class MaxworldReducer
extends Reducer<IntWritable,Text,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(IntWritable key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
int num = 0;
Text k = new Text();
for(Text val:values)
{
if(num <val.getLength())
{
num = val.getLength();
k= val;
}
}
result.set(num);
context.write(k, new IntWritable(num));
}
}
//词频统计
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}


public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String inPutPath = "hdfs://192.168.111.151:9000/user/input/";
String outPutPath = "hdfs://192.168.111.151:9000/user/output";

Job job = Job.getInstance(conf, "word count");
job.setJarByClass(Map_reduce.class);
//job.setMapperClass(TokenizerMapper.class);
//job.setMapperClass(ZimuMapper.class);
//job.setCombinerClass(IntSumReducer.class);
//job.setReducerClass(IntSumReducer.class);
//选择合适的map reduce 类
job.setMapperClass(geshuMapper.class);
job.setReducerClass(IntSumReducer.class);
//改输出类型
//job.setMapOutputKeyClass(IntWritable.class);
//ob.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(inPutPath));
FileOutputFormat.setOutputPath(job,new Path(outPutPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);

}



}

本文标题:Hadoop之MapReduce编程

文章作者:boogle

发布时间:2019年04月08日 - 10:45

最后更新:2019年05月08日 - 10:51

原始链接:https://zhengbao.wang/Hadoop之MapReduce编程/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

感觉写的不错,给买个棒棒糖呗