MapReduce 初识+案例(词频统计)
1. 1.1 是什么
:是 中的一个分布式计算框架,基于 写出的应用程序能够运行在大型集群上,并以一种可靠容错的方式并行处理上 T 级别的数据集。
一个 作业(Job)通常会把输入的数据切分为若干个独立的数据块,由 Map 任务(Task)以完全并行的方式处理它们。框架会对 Map 的输出先进行排序,然后把结果输入给 任务,通常作业的输入和输出都会被存储在文件系统中。
整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
1.2 优点/缺点 1.2.1 优点 1.2.2 缺点
不擅长做实际计算、流式计算、DAG(有向图)计算。
的任务表达能力有限,一个 只能完成一次映射和聚合,像 DAG 任务就需要多次聚合,那就需要将任务拆成多个 ,每个 任务都需要大量的磁盘 IO,将导致性能低下。
1.3 运行阶段
第一阶段: Map Task 并发实例,完全并行运行,不互相干
第二阶段: Task 并发实例,获取上一阶段的输出作为本阶段的输入
1.4 进程
:负责整个程序的过程调度及状态调度
:负责 Map 阶段的整个数据处理流程
:负责 阶段的整个数据处理流程
2. Java 词频统计
在理解 之前,不如先用 Java 实现一个词频统计的实例。
public static void main(String[] args) throws IOException {// 1. 创建容器存储结果HashMap<String, Integer> map = new HashMap<>();// 2. 读取文件File file = new File("...");String encoding = "utf8";List<String> lines = FileUtils.readLines(file, encoding);// 3. 遍历每一行for (String line : lines) {// 4. 切分出每个单词String[] words = line.split("\\s+");for (String word : words) {// 5. 替换掉特殊字符String w = word.toLowerCase().replace("\\W", "");// 6. 每出现一个单词进行数量 + 1if (!w.isEmpty()){map.put(w, map.getOrDefault(w,0) + 1 );}}}// 7. 将统计结果进行排序ArrayList<Map.Entry<String, Integer>> entries = new ArrayList<>(map.entrySet());entries.sort(new Comparator<Map.Entry<String, Integer>>() {@Overridepublic int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {return o2.getValue() - o1.getValue();}});for (Map.Entry<String, Integer> entry : entries) {System.out.printf("单词:%s \t出现的个数为 %d\n", entry.getKey(), entry.getValue());}}
3. 编程规范
利用 实现词频统计之前还需要了解 的编程规范。
通常我们编写一个 程序,会将其分解为三个部分:、、。
**: **
自定义一个类,并继承 ,定义输入输出键值对的泛型实现父类的 map() 方法(进程),定义键值对的参数类型及上下文对象 编写 map 的具体实现,最后通过 对象将映射结果写入 框架
**: **
自定义一个类,并继承 ,定义输入输出键值对的泛型实现父类的 () 方法(进程),定义键值对的参数类型及上下文对象 编写 的具体实现,最后通过 对象将聚合结果写入 框架
:
这是一个包含 main 方法的 任务的入口实例化 Job 对象,可选择性的添加各种配置将 Job 任务提交到集群 4. 词频统计 4.1
我们需要继承 类,来自于:org....
4.1.1 窥见源码 4.1.1.1 类
类提供了四个泛型,分别是输入参数的键值对类型、输出参数的键值对类型
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {...}
⚠️ 注意:这里我们不适用 Java 提供的类型,而是使用 实现的序列化类型,公共接口为
如果读文本,通常默认的 KEYIN 类型为(可表示当前行文本的位置、字节偏移量), 的类型为 Text(表示当前行文本内容),输出的 类型为 Text(表示输出值的键),输出的 的类型为 (可表示为数量)。
4.1.1.2 方法
// 在任务开始时调用一次,通常用作创建连接、打开流等获取资源的操作
protected void setup(Context context) throws IOException, InterruptedException {// NOTHING
}
// 对输入拆分中的每个键/值对调用一次,通常需要重写该方法,这是一个默认的核心方法
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {// map 处理完后将数据写出去context.write((KEYOUT) key, (VALUEOUT) value);
}
// 在任务结束时调用一次,用作关闭资源等
protected void cleanup(Context context) throws IOException, InterruptedException {// NOTHING
}
// 该方法相当于整合了上面三个方法
public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKeyValue()) {map(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {cleanup(context);}
}
4.1.2
词频统计 如下:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Job_WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {// 输出Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 将读取的文本每行的数据,分隔成单词String[] words = value.toString().split("\\s+");// 对单词进行处理for (String word : words) {// 转小写 去除特殊字符String w = word.toLowerCase().replace("\\W", "");// 将单词作为输出的 keyk.set(w);// 使用上下文对象 将 mapper 处理的结果以 的方式写到 MapReduce 框架 context.write(k, v);}}
}
4.2 4.2.1 窥见源码 4.2.1.1 类
类提供了四个泛型,分别是输入参数的键值对类型、输出参数的键值对类型
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {...}
负责接收 输出的内容,所以 KEYIN、 就对应 的输出键值对的类型, 使用 Text, 使用 (防止结果量级太大)
4.2.1.2 方法
protected void setup(...){...}
protected void cleanup(...){...}
public void run(...){...}
// 这个方法对每个键调用一次,通常需要重写该方法,这是一个默认的核心方法
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {for(VALUEIN value: values) {context.write((KEYOUT) key, (VALUEOUT) value);}
}
4.2.2
词频统计 如下:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Job_WordCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// 声明变量 用于存储聚合完的结果long count = 0;// 遍历相同的 key 获取对应的所有 valuefor (IntWritable value : values) {count += value.get();}// 将聚合完的结果写到 MapReduce 框架context.write(key, new LongWritable(count));}
}
4.3
我们需要实例化 Job 对象,并配置相关类(相当于整合了 、)。
配置成类,而不是配置为具体的对象,是因为方便后期通过反射获取多个实例
4.3.1 相关方法
编写 不需要继承某个类,但需要注意需要使用的几个方法:
1️⃣ :
通过 Job.() 获取 Job 实例,该方法实则传了一个配置信息,再向下延申,可以发现先实例化了 类,再实例化 Job
public static Job getInstance() throws IOException {// create with a null Clusterreturn getInstance(new Configuration());
}
public static Job getInstance(Configuration conf) throws IOException {// create with a null ClusterJobConf jobConf = new JobConf(conf);return new Job(jobConf);
}
2️⃣ :
意为等待事务完成,其中参数表示是否打印程序的运行过程
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {if (state == JobState.DEFINE) {submit();}if (verbose) {monitorAndPrintJob();} else {// get the completion poll interval from the client.int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();
}
4.3.2
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Job_WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 0. 自定义配置对象Configuration conf = new Configuration();// 1. 创建 Job 对象,参数可取消Job job = Job.getInstance(conf);// 2. 给 Job 对象添加 Mapper 类的 Classjob.setMapperClass(Job_WordCountMapper.class);// 3. 给 Job 对象添加 Reduce 类的 Classjob.setReducerClass(Job_WordCountReducer.class);// 4. 给 Job 对象添加 Driver 类的 Classjob.setJarByClass(Job_WordCountDriver.class);// 5. 设置 Mapper 输出的数据的 key 类型job.setMapOutputKeyClass(Text.class);// 6. 设置 Mapper 输出的数据的 value 类型job.setMapOutputValueClass(IntWritable.class);// 7. 设置 Reduce 输出的数据的 key 类型job.setOutputKeyClass(Text.class);// 8. 设置 Reduce 输出的数据的 value 类型job.setOutputValueClass(LongWritable.class);// 9. 设置 MapReduce 任务的输入路径FileInputFormat.setInputPaths(job, new Path("..."));// 10.设置 MapReduce 任务的输出路径FileOutputFormat.setOutputPath(job, new Path("..."));// 11.提交任务boolean b = job.waitForCompletion(true);System.exit( b ? 0 : 1 );}
}
⚠️ 注意:本地测试可以直接写输入输出路径,但集群上不能写死,所以需要以参数的形式让用户输入,注意 main 方法的参数是个数组类型,所以可将其修改为:
// 9. 设置 MapReduce 任务的输入路径FileInputFormat.setInputPaths(job, new Path(args[0]));// 10.设置 MapReduce 任务的输出路径FileOutputFormat.setOutputPath(job, new Path(args[1]));
4.4 运行测试
注:文本处理的不太干净,先这样了…
5. 集群测试
写完最主要的还是要放到集群上测试。
测试环境:
在开发时使用较小的样例数据, 程序是被提交给 在本地以单进程的形式循行,这种方式称为 local 测试
集群测试:
本地运行测试逻辑正确后,将程序提交至 Yarn 集群,分发到很多节点上并发执行,处理的数据和输出的结果存放于 HDFS 系统
1️⃣ Step1:通过 IDEA 将程序打包为 jar 包
2️⃣ Step2:(可省略此步骤)使用解压缩工具打开该 jar 包(不要解压),在 -1.0-.jar\META-INF 目录下修改 .MF 文件,添加主类( 路径),并保存
Main-Class: WordCount_API.Job_WordCountDriver
3️⃣ Step3:将该 jar 包上传至服务器,开启 HDFS,提前上传一个待统计的英文词频文件
4️⃣ Step4:启动该程序(若运行后找不到类,可添加 main-class 参数以配置启动类,确保文件输出路径为空)
hadoop jar xxx.jar fileIn fileOut
5️⃣ Step5:运行测试,到这算成功了
6️⃣ Step6:查看输出,如下,与 IDEA 测试的输出一致
6. 运行流程
1️⃣ Step1: 程序读取文件的输入目录上存放的相应文件
2️⃣ Step2:获取待处理的数据信息,根据集群中的配置形成一个任务分配规划
3️⃣ Step3:客户端提交 job.split、jar.xml 等文件给 yarn, yarn 中的 启动
4️⃣ Step4: 启动后根据本次 Job 的描述信息,计算存储需要的 实例数量,然后向集群申请节点,启动相应数量的 进程
5️⃣ Step5: 利用客户指定的 来读取数据,形成输入的键值对
6️⃣ Step6: 将输入的键值对传递给客户定义的 map 方法,做逻辑运算
7️⃣ Step7:map 方法运算完后将键值对收集到 缓存
8️⃣ Step8: 缓存中的键值对按照 K 分区排序后不断写到磁盘文件
9️⃣ Step9: 监控到所有 进程完成后,会根据客户指定的参数启动相应数量的 进程,并通知 进程要处理的数据分区
1️⃣0️⃣ : 进程启动后,根据 告知的待处理数据的位置,从若干台 运行所在节点上获取到若干个 输出结果文件,并在本地进行重新归并排序,然后按照 相同键的键值对为一组,调用客户定义的 方法进行逻辑运算
1️⃣1️⃣ : 运算完成后,调用客户指定的 将结果数据输出到外部存储
7. 写在最后
梳理下 词频统计的流程
:
程序启动后将 传递的文本转换为 根据空格切分出单词,并处理大小写、特殊符号将单词输出为
汇总相同 key 的个数输出该 key 的总数
获取配置信息,实例化 Job 对象关联 / 类指定 输出数据的 key-value 类型指定 输出数据的 key-value 类型指定 Job 输入输出的文件路径提交 Job