首页 >> 大全

网站日志流量系统----【数据采集模块、数据预处理模块】

2023-12-13 大全 21 作者:考证青年

7、响应码:304

8、返回的数据流量:0

9、访客的来源 url:

10、访客所用浏览器:/5.0 ( NT 5.1; rv:23.0) Gecko/

/23.0

2. 模块开发----数据预处理 2.1 主要目的

过滤“不合规”数据,清洗无意义的数据

格式转换和规整

根据后续的统计需求,过滤分离出各种不同主题(不同栏目 path)的基础数据。

2.2 实现方式

首先编写一个用于存储类型的数据

package cn.itcast.bigdata.weblog.mrbean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;/*** 对接外部数据的层,表结构定义最好跟外部数据源保持一致* 术语: 贴源表* @author**/
public class WebLogBean implements Writable {private boolean valid = true;// 判断数据是否合法private String remote_addr;// 记录客户端的ip地址private String remote_user;// 记录客户端用户名称,忽略属性"-"private String time_local;// 记录访问时间与时区private String request;// 记录请求的url与http协议private String status;// 记录请求状态;成功是200private String body_bytes_sent;// 记录发送给客户端文件主体内容大小private String http_referer;// 用来记录从那个页面链接访问过来的private String http_user_agent;// 记录客户浏览器的相关信息public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {this.valid = valid;this.remote_addr = remote_addr;this.remote_user = remote_user;this.time_local = time_local;this.request = request;this.status = status;this.body_bytes_sent = body_bytes_sent;this.http_referer = http_referer;this.http_user_agent = http_user_agent;}public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {this.remote_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {this.remote_user = remote_user;}public String getTime_local() {return this.time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {this.request = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("\001").append(this.getRemote_addr());sb.append("\001").append(this.getRemote_user());sb.append("\001").append(this.getTime_local());sb.append("\001").append(this.getRequest());sb.append("\001").append(this.getStatus());sb.append("\001").append(this.getBody_bytes_sent());sb.append("\001").append(this.getHttp_referer());sb.append("\001").append(this.getHttp_user_agent());return sb.toString();}@Overridepublic void readFields(DataInput in) throws IOException {this.valid = in.readBoolean();this.remote_addr = in.readUTF();this.remote_user = in.readUTF();this.time_local = in.readUTF();this.request = in.readUTF();this.status = in.readUTF();this.body_bytes_sent = in.readUTF();this.http_referer = in.readUTF();this.http_user_agent = in.readUTF();}@Overridepublic void write(DataOutput out) throws IOException {out.writeBoolean(this.valid);out.writeUTF(null==remote_addr?"":remote_addr);out.writeUTF(null==remote_user?"":remote_user);out.writeUTF(null==time_local?"":time_local);out.writeUTF(null==request?"":request);out.writeUTF(null==status?"":status);out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);out.writeUTF(null==http_referer?"":http_referer);out.writeUTF(null==http_user_agent?"":http_user_agent);}}

由于初步处理数据 , 是为了获得可用的且将日期 , 分隔符进行初步修改 , 因此不需要进行聚合工作 , 即不需要

编写map代码

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 处理原始日志,过滤出真实pv请求 转换时间格式 对缺失字段填充默认值 对记录标记valid和invalid* */public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {// 用来存储网站url分类数据Set<String> pages = new HashSet<String>();Text k = new Text();NullWritable v = NullWritable.get();/*** 从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤*/@Overrideprotected void setup(Context context) throws IOException, InterruptedException {pages.add("/about");pages.add("/black-ip-list/");pages.add("/cassandra-clustor/");pages.add("/finance-rhive-repurchase/");pages.add("/hadoop-family-roadmap/");pages.add("/hadoop-hive-intro/");pages.add("/hadoop-zookeeper-intro/");pages.add("/hadoop-mahout-roadmap/");}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();WebLogBean webLogBean = WebLogParser.parser(line);if (webLogBean != null) {// 过滤js/图片/css等静态资源WebLogParser.filtStaticResource(webLogBean, pages);/* if (!webLogBean.isValid()) return; */k.set(webLogBean.toString());context.write(k, v);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//		 FileInputFormat.setInputPaths(job, new Path(args[0]));
//		 FileOutputFormat.setOutputPath(job, new Path(args[1]));FileInputFormat.setInputPaths(job, new Path("d:/weblog/input"));FileOutputFormat.setOutputPath(job, new Path("d:/weblog/output"));job.setNumReduceTasks(0);boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}

首先进行对数据的初步分析 , 将有用的url筛选出来 , 因此重写了的初始化方法即setup方法 , 用于对日志文件进行过滤

其次 , 我们在对数据进行遍历操作时 , 我们编写了一个类用于处理日志数据 , 里面对日期的格式进行了转换 , 并且对js/图片/css等静态资源进行了过滤

最终实现文件的读取转换 , 即获得了初步数据处理

方法代码如下 :

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;public class WebLogParser {public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);public static WebLogBean parser(String line) {WebLogBean webLogBean = new WebLogBean();String[] arr = line.split(" ");if (arr.length > 11) {webLogBean.setRemote_addr(arr[0]);webLogBean.setRemote_user(arr[1]);String time_local = formatDate(arr[3].substring(1));if(null==time_local || "".equals(time_local)) time_local="-invalid_time-";webLogBean.setTime_local(time_local);webLogBean.setRequest(arr[6]);webLogBean.setStatus(arr[8]);webLogBean.setBody_bytes_sent(arr[9]);webLogBean.setHttp_referer(arr[10]);//如果useragent元素较多,拼接useragentif (arr.length > 12) {StringBuilder sb = new StringBuilder();for(int i=11;i<arr.length;i++){sb.append(arr[i]);}webLogBean.setHttp_user_agent(sb.toString());} else {webLogBean.setHttp_user_agent(arr[11]);}if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误webLogBean.setValid(false);}if("-invalid_time-".equals(webLogBean.getTime_local())){webLogBean.setValid(false);}} else {webLogBean=null;}return webLogBean;}public static void filtStaticResource(WebLogBean bean, Set<String> pages) {if (!pages.contains(bean.getRequest())) {bean.setValid(false);}}//格式化时间方法public static String formatDate(String time_local) {try {return df2.format(df1.parse(time_local));} catch (ParseException e) {return null;}}
}

获得如下图的数据格式

2.3 点击流模型数据梳理

首先对清洗后的原始数据进行操作 , 输出结果格式为类型 ;

对其进行处理 , 对对象集合根据某一属性进行排序 , 此时运用到.sort(数据集合) , 重写排序规则 , 将获取的参数用封装处理 , 获取到对应的 , ip , 地址 , 时间 , 访问页面 , url , 停留时间 , 第几步等参数信息

遍历数据集合 , 判断两次时间间隔是否大于30分钟 , 若大于 , 则属于另一个会话 , 若小于 , 则属于同一个会话

我们将输出的数据类型定义为

首先封装好.java

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;public class PageViewsBean implements Writable {private String session;private String remote_addr;private String timestr;private String request;private int step;private String staylong;private String referal;private String useragent;private String bytes_send;private String status;public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) {this.session = session;this.remote_addr = remote_addr;this.useragent = useragent;this.timestr = timestr;this.request = request;this.step = step;this.staylong = staylong;this.referal = referal;this.bytes_send = bytes_send;this.status = status;}public String getSession() {return session;}public void setSession(String session) {this.session = session;}public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {this.remote_addr = remote_addr;}public String getTimestr() {return timestr;}public void setTimestr(String timestr) {this.timestr = timestr;}public String getRequest() {return request;}public void setRequest(String request) {this.request = request;}public int getStep() {return step;}public void setStep(int step) {this.step = step;}public String getStaylong() {return staylong;}public void setStaylong(String staylong) {this.staylong = staylong;}public String getReferal() {return referal;}public void setReferal(String referal) {this.referal = referal;}public String getUseragent() {return useragent;}public void setUseragent(String useragent) {this.useragent = useragent;}public String getBytes_send() {return bytes_send;}public void setBytes_send(String bytes_send) {this.bytes_send = bytes_send;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}@Overridepublic void readFields(DataInput in) throws IOException {this.session = in.readUTF();this.remote_addr = in.readUTF();this.timestr = in.readUTF();this.request = in.readUTF();this.step = in.readInt();this.staylong = in.readUTF();this.referal = in.readUTF();this.useragent = in.readUTF();this.bytes_send = in.readUTF();this.status = in.readUTF();}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(session);out.writeUTF(remote_addr);out.writeUTF(timestr);out.writeUTF(request);out.writeInt(step);out.writeUTF(staylong);out.writeUTF(referal);out.writeUTF(useragent);out.writeUTF(bytes_send);out.writeUTF(status);}}

编写点击流的mr程序.java

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.UUID;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** * 将清洗之后的日志梳理出点击流pageviews模型数据* * 输入数据是清洗过后的结果数据* * 区分出每一次会话,给每一次visit(session)增加了session-id(随机uuid)* 梳理出每一次会话中所访问的每个页面(请求时间,url,停留时长,以及该页面在这次session中的序号)* 保留referral_url,body_bytes_send,useragent* * * @author* */
public class ClickStreamPageView {static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {Text k = new Text();WebLogBean v = new WebLogBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split("\001");if (fields.length < 9) return;//将切分出来的各字段set到weblogbean中v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);//只有有效记录才进入后续处理if (v.isValid()) {//此处用ip地址来标识用户k.set(v.getRemote_addr());context.write(k, v);}}}static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {Text v = new Text();@Overrideprotected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();// 先将一个用户的所有访问记录中的时间拿出来排序try {for (WebLogBean bean : values) {WebLogBean webLogBean = new WebLogBean();try {BeanUtils.copyProperties(webLogBean, bean);} catch(Exception e) {e.printStackTrace();}beans.add(webLogBean);}//将bean按时间先后顺序排序Collections.sort(beans, new Comparator<WebLogBean>() {@Overridepublic int compare(WebLogBean o1, WebLogBean o2) {try {Date d1 = toDate(o1.getTime_local());Date d2 = toDate(o2.getTime_local());if (d1 == null || d2 == null)return 0;return d1.compareTo(d2);} catch (Exception e) {e.printStackTrace();return 0;}}});/*** 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step* 核心思想:* 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session* 否则,就属于不同的session* */int step = 1;String session = UUID.randomUUID().toString();for (int i = 0; i < beans.size(); i++) {WebLogBean bean = beans.get(i);// 如果仅有1条数据,则直接输出if (1 == beans.size()) {// 设置默认停留时长为60sv.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001"+ bean.getStatus());context.write(NullWritable.get(), v);session = UUID.randomUUID().toString();break;}// 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出if (i == 0) {continue;}// 求近两次时间差long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));// 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息if (timeDiff < 30 * 60 * 1000) {v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());context.write(NullWritable.get(), v);step++;} else {// 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visitv.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());context.write(NullWritable.get(), v);// 输出完上一条之后,重置step编号step = 1;session = UUID.randomUUID().toString();}// 如果此次遍历的是最后一条,则将本条直接输出if (i == beans.size() - 1) {// 设置默认停留市场为60sv.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());context.write(NullWritable.get(), v);}}} catch (ParseException e) {e.printStackTrace();}}private String toStr(Date date) {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);return df.format(date);}private Date toDate(String timeStr) throws ParseException {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);return df.parse(timeStr);}private long timeDiff(String time1, String time2) throws ParseException {Date d1 = toDate(time1);Date d2 = toDate(time2);return d1.getTime() - d2.getTime();}private long timeDiff(Date time1, Date time2) throws ParseException {return time1.getTime() - time2.getTime();}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ClickStreamPageView.class);job.setMapperClass(ClickStreamMapper.class);job.setReducerClass(ClickStreamReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(WebLogBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//		FileInputFormat.setInputPaths(job, new Path(args[0]));
//		FileOutputFormat.setOutputPath(job, new Path(args[1]));FileInputFormat.setInputPaths(job, new Path("d:/weblog/output"));FileOutputFormat.setOutputPath(job, new Path("d:/weblog/pageviews"));job.waitForCompletion(true);}}

数据采集模块的关键技术__数据采集模块原理

获得如下格式的数据 , 获取的数据格式为(,ip, 时间,url,步号,停留时间,访问,访问次数,状态码)

2.4 点击流模型visit信息表

首先获取的数据 , 经过map程序后到达visit的 , 同样的将输出的数据封装成输出

数据格式( , , , , , , , )

封装.java

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;public class VisitBean implements Writable {private String session;private String remote_addr;private String inTime;private String outTime;private String inPage;private String outPage;private String referal;private int pageVisits;public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {this.session = session;this.remote_addr = remote_addr;this.inTime = inTime;this.outTime = outTime;this.inPage = inPage;this.outPage = outPage;this.referal = referal;this.pageVisits = pageVisits;}public String getSession() {return session;}public void setSession(String session) {this.session = session;}public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {this.remote_addr = remote_addr;}public String getInTime() {return inTime;}public void setInTime(String inTime) {this.inTime = inTime;}public String getOutTime() {return outTime;}public void setOutTime(String outTime) {this.outTime = outTime;}public String getInPage() {return inPage;}public void setInPage(String inPage) {this.inPage = inPage;}public String getOutPage() {return outPage;}public void setOutPage(String outPage) {this.outPage = outPage;}public String getReferal() {return referal;}public void setReferal(String referal) {this.referal = referal;}public int getPageVisits() {return pageVisits;}public void setPageVisits(int pageVisits) {this.pageVisits = pageVisits;}@Overridepublic void readFields(DataInput in) throws IOException {this.session = in.readUTF();this.remote_addr = in.readUTF();this.inTime = in.readUTF();this.outTime = in.readUTF();this.inPage = in.readUTF();this.outPage = in.readUTF();this.referal = in.readUTF();this.pageVisits = in.readInt();}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(session);out.writeUTF(remote_addr);out.writeUTF(inTime);out.writeUTF(outTime);out.writeUTF(inPage);out.writeUTF(outPage);out.writeUTF(referal);out.writeInt(pageVisits);}@Overridepublic String toString() {return session + "\001" + remote_addr + "\001" + inTime + "\001" + outTime + "\001" + inPage + "\001" + outPage + "\001" + referal + "\001" + pageVisits;}
}

编写Visit的mr程序.java

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 输入数据:pageviews模型结果数据* 从pageviews模型结果数据中进一步梳理出visit模型* sessionid  start-time   out-time   start-page   out-page   pagecounts  ......* * @author**/
public class ClickStreamVisit {// 以session作为key,发送数据到reducerstatic class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {PageViewsBean pvBean = new PageViewsBean();Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split("\001");int step = Integer.parseInt(fields[5]);//(String session, String remote_addr, String timestr, String request, int step, String staylong, String referal, String useragent, String bytes_send, String status)//299d6b78-9571-4fa9-bcc2-f2567c46df3472.46.128.140-2013-09-18 07:58:50/hadoop-zookeeper-intro/160"https://www.google.com/""Mozilla/5.0"14722200pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);k.set(pvBean.getSession());context.write(k, pvBean);}}static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {@Overrideprotected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException {// 将pvBeans按照step排序ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>();for (PageViewsBean pvBean : pvBeans) {PageViewsBean bean = new PageViewsBean();try {BeanUtils.copyProperties(bean, pvBean);pvBeansList.add(bean);} catch (Exception e) {e.printStackTrace();}}Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {@Overridepublic int compare(PageViewsBean o1, PageViewsBean o2) {return o1.getStep() > o2.getStep() ? 1 : -1;}});// 取这次visit的首尾pageview记录,将数据放入VisitBean中VisitBean visitBean = new VisitBean();// 取visit的首记录visitBean.setInPage(pvBeansList.get(0).getRequest());visitBean.setInTime(pvBeansList.get(0).getTimestr());// 取visit的尾记录visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest());visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr());// visit访问的页面数visitBean.setPageVisits(pvBeansList.size());// 来访者的ipvisitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr());// 本次visit的referalvisitBean.setReferal(pvBeansList.get(0).getReferal());visitBean.setSession(session.toString());context.write(NullWritable.get(), visitBean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ClickStreamVisit.class);job.setMapperClass(ClickStreamVisitMapper.class);job.setReducerClass(ClickStreamVisitReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(PageViewsBean.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(VisitBean.class);//		FileInputFormat.setInputPaths(job, new Path(args[0]));
//		FileOutputFormat.setOutputPath(job, new Path(args[1]));FileInputFormat.setInputPaths(job, new Path("d:/weblog/pageviews"));FileOutputFormat.setOutputPath(job, new Path("d:/weblog/visitout"));boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}

获得如下格式的数据

小知识点 : 关于mr程序输出文件名

part-r-00000   表示是reducetask的输出
part-m-00000   表示是maptask的输出

小结 :

扩展 : 修改.sort中的方法

如给定一个自定义学生类Student
根据年龄倒序排序
student1 student2 student3..............

List students = new ArrayList<Student>;
students.add(student1 student2 student3..............)Collections.sort(beans, new Comparator<Student>() {@Overridepublic int compare(Student s1, Student s2) {try {Long a1 = s1.getAge();Long a2 = s2.getAge();return a1.compareTo(a2)>0?-1,1;} catch (Exception e) {e.printStackTrace();return 0;}}});

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了