首页 >> 大全

数据处理 —— 出租车gps提取订单数据

2023-11-20 大全 30 作者:考证青年

1. 需求目的:

约5千万条出租车gps数据存储在mysql中,数据按照一定时间频率记录,想要获取出其中的订单数量。

(数据示例)

2. 思路分析

是车牌号,state=1表示车上有人。数据先按照车辆分组,然后按时间排序,最后选出连续的序列state=1作为一个订单

3. 实践 3.1 sql合并数据

由于数据在mysql中被分成了200+个表,有两种处理方式:

分别处理每个表获得每个车牌号数据,再按车牌号合并先把表合并成大表,直接整体处理获得每个车牌号数据(used in my )

2021.10.30

需求:一天的出租车轨迹表被成了几百个表,需要将这些表合并

思路:读出这个数据库中所有的表名,循环将所有表合并到一个新表中(使用存储过程)

学习过程:

两个表垂直拼接的方法

# 两个表之间的连接查询,但是会删除两个表中的重复数据`
select * from table union select * from table2
两个表之间的连接查询,不删除两个表中的重复数据`
select * from table union all select * from table2

:这样子写200+个句子不不现实,应该使用循环或者直接获取数据库中的表名

存储过程

DELIMITER $$USE `taxigps`$$DROP PROCEDURE IF EXISTS `test11`$$CREATE DEFINER=`root`@`%` PROCEDURE `test11`()
BEGINDECLARE stopflag INT DEFAULT 0;DECLARE tablename VARCHAR(64);-- 创建一个游标变量,declare 变量名 cursor ...DECLARE tablename_cur CURSOR FOR SELECT table_name FROM information_schema.tables WHERE table_schema='taxigps' AND table_name != 'taxi';-- 游标是保存查询结果的临时区域-- 游标变量username_cur保存了查询的临时结果,实际上就是结果集-- 当游标变量中保存的结果都查询一遍(遍历),到达结尾,将变量stopflag设置为1,用于循环中判断是否结束DECLARE CONTINUE HANDLER FOR NOT FOUND SET stopflag=1;OPEN tablename_cur; -- 打卡游标FETCH tablename_cur INTO tablename; -- 游标向前走一步,取出一条记录放到变量username中
#        SELECT tablename,000;WHILE(stopflag=0) DO -- 如果游标还没有结尾,就继续BEGIN -- 在用户名前门拼接 '_cur' 字符串SET @sql = CONCAT('insert into taxi SELECT * FROM ',tablename);PREPARE ss FROM @sql;EXECUTE ss;#INSERT INTO taxi SELECT * FROM trablename;FETCH tablename_cur INTO tablename;END;END WHILE; -- 结束循环CLOSE tablename_cur; -- 关闭游标END$$DELIMITER ;

3.2 按车牌号分组

"""
@author:HY
@time:2021/10/31:14:51
"""
from threading import Thread
import threading
from time import sleep, ctime
import pandas as pd
import time
import pickledef solve_file(name, file):for index, row in file.iterrows():date, taxi_time, _, plate_number, lng, lat, _, _, state, _ = rowif plate_number in taxi_dic.keys():taxi_dic[plate_number][taxi_time] = stateelse:taxi_dic[plate_number] = {taxi_time: state}df = pd.read_csv('test.csv', header=None, sep='\t')
# df = pd.read_csv('20160920_taxigps.csv', header=None, sep='\t')line_num = len(df)
taxi_dic = {}
time1 = time.time()thread_num = 1
for i in range(thread_num):i = i + 1exec('divide_file' + str(i) + '  = int(line_num * ' + str(i) + '  / thread_num)')if i == 1:exec('df' + str(i) + ' = df[divide_file1:]')elif i == thread_num:exec('df' + str(i) + ' = df[:divide_file' + str(thread_num) + ']')else:exec('df' + str(i) + ' = df[divide_file' + str(i-1) + ':divide_file' + str(i) + ']')for i in range(thread_num):exec('t' + str(i) + ' = threading.Thread(target=solve_file, args=('+ str(i) + ', df'+ str(i) + '))')exec('t' + str(i) + '.start()')exec('t' + str(i) + '.join()')time2 = time.time()
with open('多线程-车辆数据字典.pickle', 'wb') as f:pickle.dump(taxi_dic, f)
taxi_dic.clear()
time3 = time.time()
solve_file('all',df)
time4 = time.time()
with open('单线程-车辆数据字典.pickle', 'wb') as f:pickle.dump(taxi_dic, f)
print(f'多线程聚合耗时:time={time2-time1},单线程聚合耗时:time={time4-time3}')

3.3 生成订单数据

需求:3.2获得了每个车辆的数据,对于每个车辆这里需要按时间排序,然后生成一个订单

思路:多线程遍历每个车,寻找连续的state=1序列(多线程没单线程快…)

学习过程:

线程的暂停与恢复

关于锁:一个锁锁一个东西,这个东西释放了,别人才能拿到这个锁

锁的描述

字典按照key排序:

sort_date = sorted(data.items(), key=lambda item: item[0])

"""
@author:HY
@time:2021/11/1:10:34
"""
import pickle
import pandas as pd
from tqdm import tqdm
import threadingclass Node:def __init__(self, value):self.value = valueself.next = Noneclass Queue:"""队列用于存储每个线程要处理的数据"""def __init__(self):self.head = Noneself.tail = Noneself.length = 0def pop(self):if self.head is None: return Nonea = self.headif self.head.next is None:self.head = self.tail = Noneelse:self.head = self.head.nextself.length -= 1return adef push(self, node):if self.head is None:self.head = self.tail = nodeelse:self.tail.next = nodeself.tail = nodeself.length += 1class Request:def __init__(self, start_time, s_lng, s_lat):self.start_time = start_timeself.s_lng = s_lngself.s_lat = s_latself.e_lng = Noneself.e_lat = Noneself.end_time = Noneclass MY_Server(threading.Thread):def __init__(self, name, value=Queue()):threading.Thread.__init__(self)self.mName = nameself.mEvent = threading.Event()self.data_queue = valueself.is_running = Falseself.recieving = Truedef run(self):while self.recieving or self.data_queue.length > 0:  # 逐个车遍历。当该线程还在接受车辆或者队列还有的数据的时候都不停遍历data = self.data_queue.pop()  # data是一个列表,每个元素是[time, state,lng,lat]while data is None and self.recieving:self.is_running = Falsedata = self.data_queue.pop()if data is None: breakself.is_running = True# 排序字典:按时间排序sort_date = sorted(data.value, key=lambda item: item[0])vehicle_req_list = self.get_request(sort_date)# 将该车的订单加入列表lock.acquire()req_list.extend(vehicle_req_list)lock.release()def get_request(self, sort_data):"""sort_data是一个列表,每个元素是一个列表 now, state, lng, lat"""last = [None, 0, None, None]request_list = []one_request = Nonefor s in sort_data:now, state, lng, lat = s  # 此刻状态last_now, last_state, last_lng, last_lat = last  # 之前状态if last_state == 0:  # 之前是没有人的状态,找1建立requestif state == 0:continue  # 中间的0全部跳过elif state == 1:  # 找到了一个首字母1,建立request并且last记录为当前one_request = Request(now, lng, lat)last = [now, state, lng, lat]elif last_state == 1:  # 有人的状态,找0前面的1if state == 1:  # 不断记录直到最后一个last = [now, state, lng, lat]elif state == 0:one_request.end_time = last_nowone_request.e_lng = last_lngone_request.e_lat = last_latrequest_list.append(one_request)return request_listdef pause(self):self.mEvent.clear()def resume(self):self.mEvent.set()import time
if __name__ == "__main__":time1 = time.time()# 数据获取# all_dic = {'a': {30: 1, 40: 0, 50: 1}, 'b': {30: 1, 40: 0, 50: 0}}with open('多线程-车辆数据字典.pickle', 'rb') as f:all_dic = pickle.load(f)# 生成一个锁对象lock = threading.Lock()# 结果集req_list = []# 线程数量thread_num = 1# 开启线程# t0 = MY_Server(0)# t0.start()## t1 = MY_Server(1)# t1.start()## t2 = MY_Server(2)# t2.start()for i in range(thread_num):  # 12个线程exec('t' + str(i) + ' = MY_Server(i)')exec('t' + str(i) + '.start()')# 遍历数据输入给每个线程taxi_num = 0for _, value in all_dic.items():  # value是字典,key为时间,value为别的turn_to_whom = taxi_num % thread_numn = Node(value)exec('t' + str(turn_to_whom) + '.data_queue.push(n)')# 告诉所有线程数据传送完毕for i in range(thread_num):  # 12个线程exec('t' + str(i) + '.recieving = False')# join()for i in range(thread_num):  # 12个线程exec('t' + str(i) + '.join()')time2 = time.time()print(len(req_list))print('耗时', time2 - time1)# 保存订单数据with open('shenzhen_req.pickle', 'wb') as f:pickle.dump(req_list, f)

最后发现不如单线程跑得快,因此最终程序为

"""
@author:HY
@time:2021/11/1:16:51
"""
import pandas as pd
import time
import pickle
from tqdm import tqdm
import datetimeclass Request:def __init__(self, start_time, s_lng, s_lat):self.start_time = start_timeself.s_lng = s_lngself.s_lat = s_latself.e_lng = Noneself.e_lat = Noneself.end_time = None
def solve_file(name, file):taxi_dic = {}for index, row in tqdm(file.iterrows(), desc='处理成逐个车辆'):date, taxi_time, _, plate_number, lng, lat, _, _, state, _ = rowif plate_number in taxi_dic.keys():taxi_dic[plate_number].append([taxi_time, state, lng, lat])else:taxi_dic[plate_number] = [[taxi_time, state, lng, lat],]print('车辆数目', len(taxi_dic.keys()))with open('数据文件/单线程-车辆数据字典.pickle', 'wb') as f:pickle.dump(taxi_dic, f)return taxi_dicdef sum_request(taxi_dic):req_list = []for _, value in tqdm(taxi_dic.items(), desc='处理每个车辆'):sort_date = sorted(value, key=lambda item: item[0])vehicle_req_list = get_request(sort_date)req_list.extend(vehicle_req_list)# 保存订单数据with open('数据文件/shenzhen_req.pickle', 'wb') as f:pickle.dump(req_list, f)def get_request(sort_data):"""sort_data是一个列表,每个元素是一个列表 now, state, lng, lat"""last = [None, 0, None, None]request_list = []one_request = Nonefor s in sort_data:now, state, lng, lat = s  # 此刻状态last_now, last_state, last_lng, last_lat = last  # 之前状态if last_state == 0:  # 之前是没有人的状态,找1建立requestif state == 0:continue  # 中间的0全部跳过elif state == 1:  # 找到了一个首字母1,建立request并且last记录为当前one_request = Request(now, lng, lat)last = [now, state, lng, lat]elif last_state == 1:  # 有人的状态,找0前面的1if state == 0:one_request.end_time = last_nowone_request.e_lng = last_lngone_request.e_lat = last_latrequest_list.append(one_request)last = [now, state, lng, lat]last_now, last_state, last_lng, last_lat = last  # 之前状态if last_state == 1:one_request.end_time = last_nowone_request.e_lng = last_lngone_request.e_lat = last_latrequest_list.append(one_request)return request_listdef get_file():"""处理原文件获得订单:return:"""time1 = time.time()df = pd.read_csv('20160920_taxigps.csv', header=None, sep='\t')time2 = time.time()taxi_dic = solve_file('all', df)time3 = time.time()sum_request(taxi_dic)time4 = time.time()print(f'读数据时间{time2-time1},处理成单车辆时间{time3-time2},统计订单时间{time4-time3}')

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了