技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 系统架构 --> 几个HIVE的streaming

几个HIVE的streaming

浏览:3409次  出处信息

      前段时间在做JIS旺铺装修项目的数据开发,整个过程逻辑非常之纠结,有好几处HIVE代码本身无法满足,因此写了四个python的streaming,在此
跟大家分享下,以后有需要用到相似逻辑的同学可以拿去稍微改改直接用。
1)输出某行数据之前所有的数据;
     本实例输入为按照第三个参数分组的数据集,每组中逐条输出所有数据,直到遇到第四个参数为零时则停止输出: import sys
def main():
    flag = ”
    shop_id = ”
    flag_shopid = ”
    for line in sys.stdin:
        line = line.strip(‘\n’)
        if len(line) == 0 : return None
        split_fields = line.split(’01′)
        flag = split_fields[3]
        shop_id = split_fields[2]
        if flag == ’0′ :
            flag_shopid = shop_id
            continue
        elif shop_id == flag_shopid :
            continue
        else : print ’01′.join(split_fields)
if __name__ == ‘__main__’:
    main()
2)数据某行数据之后所有的数据;
   本实例输入为按照第三个参数分组的数据集,每组中判定第四个参数(flag),遇到第一个flag=0,将这组中之后的所有数据输出:
import sys
def main():
    flag = ”
    shop_id = ”
    pre_shopid = ”
    for line in sys.stdin:
        line = line.strip(‘\n’)
        if len(line) == 0 : return None
        split_fields = line.split(’01′)
        flag = split_fields[3]
        shop_id = split_fields[2]
        if flag == ’0′ :
            pre_shopid=split_fields[2]
            print ’01′.join(split_fields)
        elif shop_id==pre_shopid :
            print ’01′.join(split_fields)
        else :
            continue
if __name__ == ‘__main__’:
    main()
3)十进制转成三十六进制(参数为number型):
def to36hex(number):
    hex = []
    string_list = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
    remainder = ”
    finalstr = []
    originnumber = number
    if(number == 0) : finalstr.append(str(number))
    if(number<0) :
        number = -number
    while number>0 :
        remainder = number%36
        number = number/36
        if(remainder<=9) :
            finalstr.append(str(remainder))
        elif(remainder>9 and remainder<36) :
            finalstr.append(string_list[remainder-10])
    if(originnumber>=0) :
        return ”.join(finalstr[::-1])
    else :
        return “-”+”.join(finalstr[::-1])
4)行拼接,列拼接;
   本实例功能为,先将每行最后三个字段以^C为分隔符拼接起来,然后以第三个字段(shopid)为分组,将同一shopid的所有行以^B为分隔符拼接起来输出(程序中还有一些时间戳转换,如果用不到,可适当修改代码):
import sys
import time
def main():
    final_click = []
    pre_list = []
    cur_list = []
    behaviorsplit = ”
    per_len = 0
    list_len = 0
    is_commit = 0
    for line in sys.stdin:
        line = line.strip(‘\n’)
        if len(line) == 0 : return None   
        split_fields = line.split(’01′)
        cur_list = split_fields[:-3]
        if pre_list == cur_list :
            if split_fields[3] == ” :
                splitfields3 = ‘null’
            else :
                try:
                    splitfields3 = time.strftime(‘%H:%M:%S’, time.localtime(float(split_fields[3])))
                except:
                    pre_list = cur_list
                    continue
            behaviorsplit = splitfields3+’03′+split_fields[4]+’03′+split_fields[5]
            if split_fields[4] == ’0′:
                if is_commit == 1:
                    if final_click!=[]:
                        list_len -= len(final_click.pop())
                else:
                    is_commit = 1
            else :
                is_commit = 0
            per_len = len(behaviorsplit)
            list_len += per_len
            if list_len <= 60000:
                final_click.append(behaviorsplit)
        else :
            if len(pre_list):
                if final_click!=[]:
                    result = final_click[-1].split(’03′)
                    if result[1] == ’0′:
                        final_click.pop()
                    if len(pre_list) and len(final_click) :
                        print ’01′.join(pre_list) + ’01′ + “02″.join(final_click)
            is_commit = 0
            final_click = []
            if split_fields[3] == ” :
                splitfields3 = ‘null’
            else :
                try:
                    splitfields3 = time.strftime(‘%H:%M:%S’, time.localtime(float(split_fields[3])))
                except:
                    pre_list = cur_list
                    continue
            list_len = 0
            if split_fields[4] == ’0′:
                is_commit = 1
                per_len = len(behaviorsplit)
                list_len += per_len
                behaviorsplit = splitfields3+’03′+split_fields[4]+’03′+split_fields[5]
                if list_len <= 60000:
                    final_click.append(behaviorsplit)
        pre_list = cur_list
    if len(pre_list):
        if final_click!=[]:
            result = final_click[-1].split(’03′)
            if result[1] == ’0′:
                final_click.pop()
            if len(pre_list) and len(final_click) :
                print ’01′.join(pre_list) + ’01′ + “02″.join(final_click)
if __name__ == ‘__main__’:
    main() 
  用到的同学有啥问题可以直接骚扰我。

建议继续学习:

  1. 如何获取hive建表语句    (阅读:6695)
  2. HTTP Live Streaming (HLS) 不错的视频直播技术    (阅读:6024)
  3. Hive源码解析-之-词法分析器 parser    (阅读:5814)
  4. HIVE中UDTF编写和使用    (阅读:5275)
  5. Hive的入口 -- Hive源码解析    (阅读:4805)
  6. Hive源码解析-之-语法解析器    (阅读:4290)
  7. 用hadoop hive协同scribe log用户行为分析方案    (阅读:4143)
  8. 写好Hive 程序的五个提示    (阅读:3177)
  9. Impala与Hive的比较    (阅读:2956)
  10. Hive 随谈(一)    (阅读:2853)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2024 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1