首页 >> 大全

MapReduce 初识+案例(词频统计)

2023-10-08 大全 30 作者:考证青年

1. 1.1 是什么

:是 中的一个分布式计算框架,基于 写出的应用程序能够运行在大型集群上,并以一种可靠容错的方式并行处理上 T 级别的数据集。

一个 作业(Job)通常会把输入的数据切分为若干个独立的数据块,由 Map 任务(Task)以完全并行的方式处理它们。框架会对 Map 的输出先进行排序,然后把结果输入给 任务,通常作业的输入和输出都会被存储在文件系统中。

整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

1.2 优点/缺点 1.2.1 优点 1.2.2 缺点

不擅长做实际计算、流式计算、DAG(有向图)计算。

的任务表达能力有限,一个 只能完成一次映射和聚合,像 DAG 任务就需要多次聚合,那就需要将任务拆成多个 ,每个 任务都需要大量的磁盘 IO,将导致性能低下。

1.3 运行阶段

第一阶段: Map Task 并发实例,完全并行运行,不互相干

第二阶段: Task 并发实例,获取上一阶段的输出作为本阶段的输入

1.4 进程

:负责整个程序的过程调度及状态调度

:负责 Map 阶段的整个数据处理流程

:负责 阶段的整个数据处理流程

2. Java 词频统计

在理解 之前,不如先用 Java 实现一个词频统计的实例。

public static void main(String[] args) throws IOException {// 1. 创建容器存储结果HashMap<String, Integer> map = new HashMap<>();// 2. 读取文件File file = new File("...");String encoding = "utf8";List<String> lines = FileUtils.readLines(file, encoding);// 3. 遍历每一行for (String line : lines) {// 4. 切分出每个单词String[] words = line.split("\\s+");for (String word : words) {// 5. 替换掉特殊字符String w = word.toLowerCase().replace("\\W", "");// 6. 每出现一个单词进行数量 + 1if (!w.isEmpty()){map.put(w, map.getOrDefault(w,0) + 1 );}}}// 7. 将统计结果进行排序ArrayList<Map.Entry<String, Integer>> entries = new ArrayList<>(map.entrySet());entries.sort(new Comparator<Map.Entry<String, Integer>>() {@Overridepublic int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {return o2.getValue() - o1.getValue();}});for (Map.Entry<String, Integer> entry : entries) {System.out.printf("单词:%s \t出现的个数为 %d\n", entry.getKey(), entry.getValue());}}

3. 编程规范

利用 实现词频统计之前还需要了解 的编程规范。

通常我们编写一个 程序,会将其分解为三个部分:、、。

**: **

自定义一个类,并继承 ,定义输入输出键值对的泛型实现父类的 map() 方法(进程),定义键值对的参数类型及上下文对象 编写 map 的具体实现,最后通过 对象将映射结果写入 框架

**: **

自定义一个类,并继承 ,定义输入输出键值对的泛型实现父类的 () 方法(进程),定义键值对的参数类型及上下文对象 编写 的具体实现,最后通过 对象将聚合结果写入 框架

这是一个包含 main 方法的 任务的入口实例化 Job 对象,可选择性的添加各种配置将 Job 任务提交到集群 4. 词频统计 4.1

我们需要继承 类,来自于:org....

4.1.1 窥见源码 4.1.1.1 类

类提供了四个泛型,分别是输入参数的键值对类型、输出参数的键值对类型

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {...}

⚠️ 注意:这里我们不适用 Java 提供的类型,而是使用 实现的序列化类型,公共接口为

如果读文本,通常默认的 KEYIN 类型为(可表示当前行文本的位置、字节偏移量), 的类型为 Text(表示当前行文本内容),输出的 类型为 Text(表示输出值的键),输出的 的类型为 (可表示为数量)。

4.1.1.2 方法

// 在任务开始时调用一次,通常用作创建连接、打开流等获取资源的操作
protected void setup(Context context) throws IOException, InterruptedException {// NOTHING
}

// 对输入拆分中的每个键/值对调用一次,通常需要重写该方法,这是一个默认的核心方法
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {// map 处理完后将数据写出去context.write((KEYOUT) key, (VALUEOUT) value);
}

// 在任务结束时调用一次,用作关闭资源等
protected void cleanup(Context context) throws IOException, InterruptedException {// NOTHING
}

简述单词统计的map过程_词频统计分析_

// 该方法相当于整合了上面三个方法
public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKeyValue()) {map(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {cleanup(context);}
}

4.1.2

词频统计 如下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Job_WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {// 输出Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 将读取的文本每行的数据,分隔成单词String[] words = value.toString().split("\\s+");// 对单词进行处理for (String word : words) {// 转小写 去除特殊字符String w = word.toLowerCase().replace("\\W", "");// 将单词作为输出的 keyk.set(w);// 使用上下文对象 将 mapper 处理的结果以  的方式写到 MapReduce 框架context.write(k, v);}}
}

4.2 4.2.1 窥见源码 4.2.1.1 类

类提供了四个泛型,分别是输入参数的键值对类型、输出参数的键值对类型

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {...}

负责接收 输出的内容,所以 KEYIN、 就对应 的输出键值对的类型, 使用 Text, 使用 (防止结果量级太大)

4.2.1.2 方法

protected void setup(...){...}
protected void cleanup(...){...}
public void run(...){...}

// 这个方法对每个键调用一次,通常需要重写该方法,这是一个默认的核心方法
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {for(VALUEIN value: values) {context.write((KEYOUT) key, (VALUEOUT) value);}
}

4.2.2

词频统计 如下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Job_WordCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,  Context context) throws IOException, InterruptedException {// 声明变量 用于存储聚合完的结果long count = 0;// 遍历相同的 key 获取对应的所有 valuefor (IntWritable value : values) {count += value.get();}// 将聚合完的结果写到 MapReduce 框架context.write(key, new LongWritable(count));}
}

4.3

我们需要实例化 Job 对象,并配置相关类(相当于整合了 、)。

配置成类,而不是配置为具体的对象,是因为方便后期通过反射获取多个实例

4.3.1 相关方法

编写 不需要继承某个类,但需要注意需要使用的几个方法:

1️⃣ :

通过 Job.() 获取 Job 实例,该方法实则传了一个配置信息,再向下延申,可以发现先实例化了 类,再实例化 Job

public static Job getInstance() throws IOException {// create with a null Clusterreturn getInstance(new Configuration());
}

public static Job getInstance(Configuration conf) throws IOException {// create with a null ClusterJobConf jobConf = new JobConf(conf);return new Job(jobConf);
}

2️⃣ :

意为等待事务完成,其中参数表示是否打印程序的运行过程

public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {if (state == JobState.DEFINE) {submit();}if (verbose) {monitorAndPrintJob();} else {// get the completion poll interval from the client.int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();
}

4.3.2

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Job_WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 0. 自定义配置对象Configuration conf = new Configuration();// 1. 创建 Job 对象,参数可取消Job job = Job.getInstance(conf);// 2. 给 Job 对象添加 Mapper 类的 Classjob.setMapperClass(Job_WordCountMapper.class);// 3. 给 Job 对象添加 Reduce 类的 Classjob.setReducerClass(Job_WordCountReducer.class);// 4. 给 Job 对象添加 Driver 类的 Classjob.setJarByClass(Job_WordCountDriver.class);// 5. 设置 Mapper 输出的数据的 key 类型job.setMapOutputKeyClass(Text.class);// 6. 设置 Mapper 输出的数据的 value 类型job.setMapOutputValueClass(IntWritable.class);// 7. 设置 Reduce 输出的数据的 key 类型job.setOutputKeyClass(Text.class);// 8. 设置 Reduce 输出的数据的 value 类型job.setOutputValueClass(LongWritable.class);// 9. 设置 MapReduce 任务的输入路径FileInputFormat.setInputPaths(job, new Path("..."));// 10.设置 MapReduce 任务的输出路径FileOutputFormat.setOutputPath(job, new Path("..."));// 11.提交任务boolean b = job.waitForCompletion(true);System.exit( b ? 0 : 1 );}
}

⚠️ 注意:本地测试可以直接写输入输出路径,但集群上不能写死,所以需要以参数的形式让用户输入,注意 main 方法的参数是个数组类型,所以可将其修改为:

    // 9. 设置 MapReduce 任务的输入路径FileInputFormat.setInputPaths(job, new Path(args[0]));// 10.设置 MapReduce 任务的输出路径FileOutputFormat.setOutputPath(job, new Path(args[1]));

4.4 运行测试

注:文本处理的不太干净,先这样了…

5. 集群测试

写完最主要的还是要放到集群上测试。

测试环境:

在开发时使用较小的样例数据, 程序是被提交给 在本地以单进程的形式循行,这种方式称为 local 测试

集群测试:

本地运行测试逻辑正确后,将程序提交至 Yarn 集群,分发到很多节点上并发执行,处理的数据和输出的结果存放于 HDFS 系统

1️⃣ Step1:通过 IDEA 将程序打包为 jar 包

2️⃣ Step2:(可省略此步骤)使用解压缩工具打开该 jar 包(不要解压),在 -1.0-.jar\META-INF 目录下修改 .MF 文件,添加主类( 路径),并保存

Main-Class: WordCount_API.Job_WordCountDriver

3️⃣ Step3:将该 jar 包上传至服务器,开启 HDFS,提前上传一个待统计的英文词频文件

4️⃣ Step4:启动该程序(若运行后找不到类,可添加 main-class 参数以配置启动类,确保文件输出路径为空)

hadoop jar xxx.jar  fileIn fileOut

5️⃣ Step5:运行测试,到这算成功了

6️⃣ Step6:查看输出,如下,与 IDEA 测试的输出一致

6. 运行流程

1️⃣ Step1: 程序读取文件的输入目录上存放的相应文件

2️⃣ Step2:获取待处理的数据信息,根据集群中的配置形成一个任务分配规划

3️⃣ Step3:客户端提交 job.split、jar.xml 等文件给 yarn, yarn 中的 启动

4️⃣ Step4: 启动后根据本次 Job 的描述信息,计算存储需要的 实例数量,然后向集群申请节点,启动相应数量的 进程

5️⃣ Step5: 利用客户指定的 来读取数据,形成输入的键值对

6️⃣ Step6: 将输入的键值对传递给客户定义的 map 方法,做逻辑运算

7️⃣ Step7:map 方法运算完后将键值对收集到 缓存

8️⃣ Step8: 缓存中的键值对按照 K 分区排序后不断写到磁盘文件

9️⃣ Step9: 监控到所有 进程完成后,会根据客户指定的参数启动相应数量的 进程,并通知 进程要处理的数据分区

1️⃣0️⃣ : 进程启动后,根据 告知的待处理数据的位置,从若干台 运行所在节点上获取到若干个 输出结果文件,并在本地进行重新归并排序,然后按照 相同键的键值对为一组,调用客户定义的 方法进行逻辑运算

1️⃣1️⃣ : 运算完成后,调用客户指定的 将结果数据输出到外部存储

7. 写在最后

梳理下 词频统计的流程

程序启动后将 传递的文本转换为 根据空格切分出单词,并处理大小写、特殊符号将单词输出为

汇总相同 key 的个数输出该 key 的总数

获取配置信息,实例化 Job 对象关联 / 类指定 输出数据的 key-value 类型指定 输出数据的 key-value 类型指定 Job 输入输出的文件路径提交 Job

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了