前段时间在做JIS旺铺装修项目的数据开发,整个过程逻辑非常之纠结,有好几处HIVE代码本身无法满足,因此写了四个python的streaming,在此
跟大家分享下,以后有需要用到相似逻辑的同学可以拿去稍微改改直接用。
1)输出某行数据之前所有的数据;
本实例输入为按照第三个参数分组的数据集,每组中逐条输出所有数据,直到遇到第四个参数为零时则停止输出: import sys
def main():
flag = ”
shop_id = ”
flag_shopid = ”
for line in sys.stdin:
line = line.strip(‘\n’)
if len(line) == 0 : return None
split_fields = line.split(’01′)
flag = split_fields[3]
shop_id = split_fields[2]
if flag == ’0′ :
flag_shopid = shop_id
continue
elif shop_id == flag_shopid :
continue
else : print ’01′.join(split_fields)
if __name__ == ‘__main__’:
main()
2)数据某行数据之后所有的数据;
本实例输入为按照第三个参数分组的数据集,每组中判定第四个参数(flag),遇到第一个flag=0,将这组中之后的所有数据输出:
import sys
def main():
flag = ”
shop_id = ”
pre_shopid = ”
for line in sys.stdin:
line = line.strip(‘\n’)
if len(line) == 0 : return None
split_fields = line.split(’01′)
flag = split_fields[3]
shop_id = split_fields[2]
if flag == ’0′ :
pre_shopid=split_fields[2]
print ’01′.join(split_fields)
elif shop_id==pre_shopid :
print ’01′.join(split_fields)
else :
continue
if __name__ == ‘__main__’:
main()
3)十进制转成三十六进制(参数为number型):
def to36hex(number):
hex = []
string_list = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
remainder = ”
finalstr = []
originnumber = number
if(number == 0) : finalstr.append(str(number))
if(number<0) :
number = -number
while number>0 :
remainder = number%36
number = number/36
if(remainder<=9) :
finalstr.append(str(remainder))
elif(remainder>9 and remainder<36) :
finalstr.append(string_list[remainder-10])
if(originnumber>=0) :
return ”.join(finalstr[::-1])
else :
return “-”+”.join(finalstr[::-1])
4)行拼接,列拼接;
本实例功能为,先将每行最后三个字段以^C为分隔符拼接起来,然后以第三个字段(shopid)为分组,将同一shopid的所有行以^B为分隔符拼接起来输出(程序中还有一些时间戳转换,如果用不到,可适当修改代码):
import sys
import time
def main():
final_click = []
pre_list = []
cur_list = []
behaviorsplit = ”
per_len = 0
list_len = 0
is_commit = 0
for line in sys.stdin:
line = line.strip(‘\n’)
if len(line) == 0 : return None
split_fields = line.split(’01′)
cur_list = split_fields[:-3]
if pre_list == cur_list :
if split_fields[3] == ” :
splitfields3 = ‘null’
else :
try:
splitfields3 = time.strftime(‘%H:%M:%S’, time.localtime(float(split_fields[3])))
except:
pre_list = cur_list
continue
behaviorsplit = splitfields3+’03′+split_fields[4]+’03′+split_fields[5]
if split_fields[4] == ’0′:
if is_commit == 1:
if final_click!=[]:
list_len -= len(final_click.pop())
else:
is_commit = 1
else :
is_commit = 0
per_len = len(behaviorsplit)
list_len += per_len
if list_len <= 60000:
final_click.append(behaviorsplit)
else :
if len(pre_list):
if final_click!=[]:
result = final_click[-1].split(’03′)
if result[1] == ’0′:
final_click.pop()
if len(pre_list) and len(final_click) :
print ’01′.join(pre_list) + ’01′ + “02″.join(final_click)
is_commit = 0
final_click = []
if split_fields[3] == ” :
splitfields3 = ‘null’
else :
try:
splitfields3 = time.strftime(‘%H:%M:%S’, time.localtime(float(split_fields[3])))
except:
pre_list = cur_list
continue
list_len = 0
if split_fields[4] == ’0′:
is_commit = 1
per_len = len(behaviorsplit)
list_len += per_len
behaviorsplit = splitfields3+’03′+split_fields[4]+’03′+split_fields[5]
if list_len <= 60000:
final_click.append(behaviorsplit)
pre_list = cur_list
if len(pre_list):
if final_click!=[]:
result = final_click[-1].split(’03′)
if result[1] == ’0′:
final_click.pop()
if len(pre_list) and len(final_click) :
print ’01′.join(pre_list) + ’01′ + “02″.join(final_click)
if __name__ == ‘__main__’:
main()
用到的同学有啥问题可以直接骚扰我。
几个HIVE的streaming
本机暂存
同分类推荐文章
- 使用deepseek进行Oracle恢复,引起重大故障 (2026-06-22 10:56:00)
- 接手一个只差临门一脚的数据库恢复 (2026-06-18 00:13:09)
- 我做了一个 AI 版的 StarRocks 升级风险扫描工具,直接帮我定位到一个风险 (2026-06-15 01:00:00)
建议继续学习
- 用Hyer来进行网站的抓取 (累计阅读 158,251)
- 配置Nginx+uwsgi更方便地部署python应用 (累计阅读 107,164)
- 程序员技术练级攻略 (累计阅读 35,471)
- python实现自动登录discuz论坛 (累计阅读 32,834)
- python编程细节──遍历dict的两种方法比较 (累计阅读 20,371)
- 每个程序员都应该学习使用Python或Ruby (累计阅读 17,918)
- Chrome和goagent的配置方法,你懂的 (累计阅读 16,843)
- 30分钟3300%性能提升――python+memcached网页优化小记 (累计阅读 13,742)
- 使用python爬虫抓站的一些技巧总结:进阶篇 (累计阅读 13,301)
- 我的PHP,Python和Ruby之路 (累计阅读 13,147)