IT技术博客大学习 共学习 共进步
全部 移动开发 后端 数据库 AI 算法 安全 DevOps 前端 设计 开发者

几个HIVE的streaming

量子数科院 2011-05-25 12:26:50 累计浏览 4,392 次
本机暂存

      前段时间在做JIS旺铺装修项目的数据开发,整个过程逻辑非常之纠结,有好几处HIVE代码本身无法满足,因此写了四个python的streaming,在此
跟大家分享下,以后有需要用到相似逻辑的同学可以拿去稍微改改直接用。
1)输出某行数据之前所有的数据;
     本实例输入为按照第三个参数分组的数据集,每组中逐条输出所有数据,直到遇到第四个参数为零时则停止输出: import sys
def main():
    flag = ”
    shop_id = ”
    flag_shopid = ”
    for line in sys.stdin:
        line = line.strip(‘\n’)
        if len(line) == 0 : return None
        split_fields = line.split(’01′)
        flag = split_fields[3]
        shop_id = split_fields[2]
        if flag == ’0′ :
            flag_shopid = shop_id
            continue
        elif shop_id == flag_shopid :
            continue
        else : print ’01′.join(split_fields)
if __name__ == ‘__main__’:
    main()
2)数据某行数据之后所有的数据;
   本实例输入为按照第三个参数分组的数据集,每组中判定第四个参数(flag),遇到第一个flag=0,将这组中之后的所有数据输出:
import sys
def main():
    flag = ”
    shop_id = ”
    pre_shopid = ”
    for line in sys.stdin:
        line = line.strip(‘\n’)
        if len(line) == 0 : return None
        split_fields = line.split(’01′)
        flag = split_fields[3]
        shop_id = split_fields[2]
        if flag == ’0′ :
            pre_shopid=split_fields[2]
            print ’01′.join(split_fields)
        elif shop_id==pre_shopid :
            print ’01′.join(split_fields)
        else :
            continue
if __name__ == ‘__main__’:
    main()
3)十进制转成三十六进制(参数为number型):
def to36hex(number):
    hex = []
    string_list = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
    remainder = ”
    finalstr = []
    originnumber = number
    if(number == 0) : finalstr.append(str(number))
    if(number<0) :
        number = -number
    while number>0 :
        remainder = number%36
        number = number/36
        if(remainder<=9) :
            finalstr.append(str(remainder))
        elif(remainder>9 and remainder<36) :
            finalstr.append(string_list[remainder-10])
    if(originnumber>=0) :
        return ”.join(finalstr[::-1])
    else :
        return “-”+”.join(finalstr[::-1])
4)行拼接,列拼接;
   本实例功能为,先将每行最后三个字段以^C为分隔符拼接起来,然后以第三个字段(shopid)为分组,将同一shopid的所有行以^B为分隔符拼接起来输出(程序中还有一些时间戳转换,如果用不到,可适当修改代码):
import sys
import time
def main():
    final_click = []
    pre_list = []
    cur_list = []
    behaviorsplit = ”
    per_len = 0
    list_len = 0
    is_commit = 0
    for line in sys.stdin:
        line = line.strip(‘\n’)
        if len(line) == 0 : return None   
        split_fields = line.split(’01′)
        cur_list = split_fields[:-3]
        if pre_list == cur_list :
            if split_fields[3] == ” :
                splitfields3 = ‘null’
            else :
                try:
                    splitfields3 = time.strftime(‘%H:%M:%S’, time.localtime(float(split_fields[3])))
                except:
                    pre_list = cur_list
                    continue
            behaviorsplit = splitfields3+’03′+split_fields[4]+’03′+split_fields[5]
            if split_fields[4] == ’0′:
                if is_commit == 1:
                    if final_click!=[]:
                        list_len -= len(final_click.pop())
                else:
                    is_commit = 1
            else :
                is_commit = 0
            per_len = len(behaviorsplit)
            list_len += per_len
            if list_len <= 60000:
                final_click.append(behaviorsplit)
        else :
            if len(pre_list):
                if final_click!=[]:
                    result = final_click[-1].split(’03′)
                    if result[1] == ’0′:
                        final_click.pop()
                    if len(pre_list) and len(final_click) :
                        print ’01′.join(pre_list) + ’01′ + “02″.join(final_click)
            is_commit = 0
            final_click = []
            if split_fields[3] == ” :
                splitfields3 = ‘null’
            else :
                try:
                    splitfields3 = time.strftime(‘%H:%M:%S’, time.localtime(float(split_fields[3])))
                except:
                    pre_list = cur_list
                    continue
            list_len = 0
            if split_fields[4] == ’0′:
                is_commit = 1
                per_len = len(behaviorsplit)
                list_len += per_len
                behaviorsplit = splitfields3+’03′+split_fields[4]+’03′+split_fields[5]
                if list_len <= 60000:
                    final_click.append(behaviorsplit)
        pre_list = cur_list
    if len(pre_list):
        if final_click!=[]:
            result = final_click[-1].split(’03′)
            if result[1] == ’0′:
                final_click.pop()
            if len(pre_list) and len(final_click) :
                print ’01′.join(pre_list) + ’01′ + “02″.join(final_click)
if __name__ == ‘__main__’:
    main() 
  用到的同学有啥问题可以直接骚扰我。

同分类推荐文章

  1. 使用deepseek进行Oracle恢复,引起重大故障 (2026-06-22 10:56:00)
  2. 接手一个只差临门一脚的数据库恢复 (2026-06-18 00:13:09)
  3. 我做了一个 AI 版的 StarRocks 升级风险扫描工具,直接帮我定位到一个风险 (2026-06-15 01:00:00)

查看更多 数据库 文章 →

建议继续学习

  1. 用Hyer来进行网站的抓取 (累计阅读 158,251)
  2. 配置Nginx+uwsgi更方便地部署python应用 (累计阅读 107,164)
  3. 程序员技术练级攻略 (累计阅读 35,471)
  4. python实现自动登录discuz论坛 (累计阅读 32,834)
  5. python编程细节──遍历dict的两种方法比较 (累计阅读 20,371)
  6. 每个程序员都应该学习使用Python或Ruby (累计阅读 17,918)
  7. Chrome和goagent的配置方法,你懂的 (累计阅读 16,843)
  8. 30分钟3300%性能提升――python+memcached网页优化小记 (累计阅读 13,742)
  9. 使用python爬虫抓站的一些技巧总结:进阶篇 (累计阅读 13,301)
  10. 我的PHP,Python和Ruby之路 (累计阅读 13,147)