几个HIVE的streaming
前段时间在做JIS旺铺装修项目的数据开发,整个过程逻辑非常之纠结,有好几处HIVE代码本身无法满足,因此写了四个python的streaming,在此
跟大家分享下,以后有需要用到相似逻辑的同学可以拿去稍微改改直接用。
1)输出某行数据之前所有的数据;
本实例输入为按照第三个参数分组的数据集,每组中逐条输出所有数据,直到遇到第四个参数为零时则停止输出: import sys
def main():
flag = ”
shop_id = ”
flag_shopid = ”
for line in sys.stdin:
line = line.strip(‘\n’)
if len(line) == 0 : return None
split_fields = line.split(’01′)
flag = split_fields[3]
shop_id = split_fields[2]
if flag == ’0′ :
flag_shopid = shop_id
continue
elif shop_id == flag_shopid :
continue
else : print ’01′.join(split_fields)
if __name__ == ‘__main__’:
main()
2)数据某行数据之后所有的数据;
本实例输入为按照第三个参数分组的数据集,每组中判定第四个参数(flag),遇到第一个flag=0,将这组中之后的所有数据输出:
import sys
def main():
flag = ”
shop_id = ”
pre_shopid = ”
for line in sys.stdin:
line = line.strip(‘\n’)
if len(line) == 0 : return None
split_fields = line.split(’01′)
flag = split_fields[3]
shop_id = split_fields[2]
if flag == ’0′ :
pre_shopid=split_fields[2]
print ’01′.join(split_fields)
elif shop_id==pre_shopid :
print ’01′.join(split_fields)
else :
continue
if __name__ == ‘__main__’:
main()
3)十进制转成三十六进制(参数为number型):
def to36hex(number):
hex = []
string_list = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
remainder = ”
finalstr = []
originnumber = number
if(number == 0) : finalstr.append(str(number))
if(number<0) :
number = -number
while number>0 :
remainder = number%36
number = number/36
if(remainder<=9) :
finalstr.append(str(remainder))
elif(remainder>9 and remainder<36) :
finalstr.append(string_list[remainder-10])
if(originnumber>=0) :
return ”.join(finalstr[::-1])
else :
return “-”+”.join(finalstr[::-1])
4)行拼接,列拼接;
本实例功能为,先将每行最后三个字段以^C为分隔符拼接起来,然后以第三个字段(shopid)为分组,将同一shopid的所有行以^B为分隔符拼接起来输出(程序中还有一些时间戳转换,如果用不到,可适当修改代码):
import sys
import time
def main():
final_click = []
pre_list = []
cur_list = []
behaviorsplit = ”
per_len = 0
list_len = 0
is_commit = 0
for line in sys.stdin:
line = line.strip(‘\n’)
if len(line) == 0 : return None
split_fields = line.split(’01′)
cur_list = split_fields[:-3]
if pre_list == cur_list :
if split_fields[3] == ” :
splitfields3 = ‘null’
else :
try:
splitfields3 = time.strftime(‘%H:%M:%S’, time.localtime(float(split_fields[3])))
except:
pre_list = cur_list
continue
behaviorsplit = splitfields3+’03′+split_fields[4]+’03′+split_fields[5]
if split_fields[4] == ’0′:
if is_commit == 1:
if final_click!=[]:
list_len -= len(final_click.pop())
else:
is_commit = 1
else :
is_commit = 0
per_len = len(behaviorsplit)
list_len += per_len
if list_len <= 60000:
final_click.append(behaviorsplit)
else :
if len(pre_list):
if final_click!=[]:
result = final_click[-1].split(’03′)
if result[1] == ’0′:
final_click.pop()
if len(pre_list) and len(final_click) :
print ’01′.join(pre_list) + ’01′ + “02″.join(final_click)
is_commit = 0
final_click = []
if split_fields[3] == ” :
splitfields3 = ‘null’
else :
try:
splitfields3 = time.strftime(‘%H:%M:%S’, time.localtime(float(split_fields[3])))
except:
pre_list = cur_list
continue
list_len = 0
if split_fields[4] == ’0′:
is_commit = 1
per_len = len(behaviorsplit)
list_len += per_len
behaviorsplit = splitfields3+’03′+split_fields[4]+’03′+split_fields[5]
if list_len <= 60000:
final_click.append(behaviorsplit)
pre_list = cur_list
if len(pre_list):
if final_click!=[]:
result = final_click[-1].split(’03′)
if result[1] == ’0′:
final_click.pop()
if len(pre_list) and len(final_click) :
print ’01′.join(pre_list) + ’01′ + “02″.join(final_click)
if __name__ == ‘__main__’:
main()
用到的同学有啥问题可以直接骚扰我。
建议继续学习:
- 如何获取hive建表语句 (阅读:6709)
- HTTP Live Streaming (HLS) 不错的视频直播技术 (阅读:6109)
- Hive源码解析-之-词法分析器 parser (阅读:5886)
- HIVE中UDTF编写和使用 (阅读:5292)
- Hive的入口 -- Hive源码解析 (阅读:4882)
- Hive源码解析-之-语法解析器 (阅读:4377)
- 用hadoop hive协同scribe log用户行为分析方案 (阅读:4160)
- 写好Hive 程序的五个提示 (阅读:3193)
- Impala与Hive的比较 (阅读:3048)
- Hive 随谈(一) (阅读:2869)
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:jinmi 来源: 量子数科院
- 标签: HIVE streaming
- 发布时间:2011-05-25 12:26:50
- [51] WEB系统需要关注的一些点
- [49] Go Reflect 性能
- [48] Oracle MTS模式下 进程地址与会话信
- [46] IOS安全–浅谈关于IOS加固的几种方法
- [45] android 开发入门
- [45] Twitter/微博客的学习摘要
- [45] find命令的一点注意事项
- [44] 图书馆的世界纪录
- [44] 如何拿下简短的域名
- [44] 【社会化设计】自我(self)部分――欢迎区