IT技术博客大学习 共学习 共进步
全部 移动开发 后端 数据库 AI 算法 安全 DevOps 前端 设计 开发者

Hive的入口 -- Hive源码解析

淘宝数据平台团队 2010-11-15 22:20:25 累计浏览 5,961 次
本机暂存

初衷:hi,大家好,我叫红门,在hive方面是个菜鸟,现在读hive源码希望能够更了解底层,尤其是hive与Hadoop切换这块。但在读hive源码时发现比Hadoop源码难读一些,虽然Hadoop源码量比较大,但是整体很规范,命名规范,关键地方注释的比较明确。

去年在读和修改Hadoop源码时都感觉比较清晰,可读性比较好一些,往往可以望文生义,可能也有自己对hive不熟的原因在里面吧!

想必别人应该也有人在关注hive底层,所以决定拿出来与大家分享,共同学习,如有理解不到位的地方,欢迎拍砖,更欢迎交流。

一直在思索应该从哪里写,不能一大堆代码粘上来,一通狂砍,大家会不知所云,有点装大。
后来给自己定了个原则:
1。hive执行过程为主线,尽量把关键的一些部分提出来,每次定一个主题。
2.怎么描述才能让别人更容易理解,更容易理清思路,尽量图形化描述。(我用的是mindManager画的图,不知道这种图形化的是不是可以让你看的更舒服。)

废话说得差不多了,现在开始!!

我们先从hive入口聊起,一路按着hive的执行过程主线走下来,
这次的主题就叫做: hive的入口!! (该图借用网上的,不知出处):

原图已失效

CliDriver可以说是hive的入口,对应上图中的UI部分。大家看它的结构就可以明白了,main()函数!对!你猜的没错就是从main()开始。
下图是类结构,总共有五个关键的函数。

原图已失效
这个类可以说是用户和hive交互的平台,你可以把它认为是hive客户端。总共有4个key函数:
下图是这个CliDriver类在整个Hive执行过程中的作用的地位。

原图已失效

如图,hive执行流程_按正常步骤走:
1.―CliDriver.classz中main()开始,初始化Hive环境变量,获取客户端提供的string或者file。
2 ―将其代码送入processLine(cmd),这步主要是读入cmd:‘;’之前的所有字符串都读入(不做任何检查),之后的会忽略。读完后,传入processCmd()处理
3 ―调用processCmd(cmd),分情况处理
//- 读入cmd,并分情况处理,总共分为以下五种情况,根据命令的开头字符串来确定用什么方法处理。
// 1.set.. 设置operator参数,hive环境参数
// 2.quit or exit ― 退出Hive环境
// 3.! 开头
// 4.dfs 开头 交给FsShell处理
// 5.hivesql 正常hivesql执行语句,我们最关心的是这里。语句交给了、、Hive真正的核心引擎 Driver。返回ret = Driver.run(cmd);
4.―不同情况不同处理方法。我们关心的第五种情况:正常的HiveSQL如何处理?其实是进入driver.class里面run(),
//读入hivesql ,词法分析,语法分析,直到执行结束
//1.ParseDriver 返回 词法树 CommonTree
//2.BaseSemanticAnalyzer sem.analyze(tree, ctx);//语义解释,生成执行计划
5.―。。。etc

今天的主题是hive的入口,我们只聊前三步。

现在我们细化主要函数,看hive实际是怎么处理的。(如果你只想了解hive工作流程或原理,不想拘泥于细节,可以跳过下面的细节,如果你想修改源码,做优化,可以继续往下看)

原图已失效
下面是hive入口 涉及的一些关键类和关键函数。

――――――――――-类CliDriver ―

由于这个类,可以说贯彻Hive的整个流程架构,所以我聊的比较细。

――――――――――――――――main()

  public static void main(String[] args) throws IOException {

    OptionsProcessor oproc = new OptionsProcessor();
    if(! oproc.process_stage1(args)) {
      System.exit(1);
    }

    // NOTE: It is critical to do this here so that log4j is reinitialized before
    // any of the other core hive classes are loaded
    SessionState.initHiveLog4j();
    //建立客户端sesssion
    CliSessionState ss = new CliSessionState (new HiveConf(SessionState.class));
    ss.in = System.in;//标准输入
    try {
      ss.out = new PrintStream(System.out, true, "UTF-8");//??
      ss.err = new PrintStream(System.err, true, "UTF-8");//??
    } catch (UnsupportedEncodingException e) {
      System.exit(3);
    }

    SessionState.start(ss);// -- start session  通过复制当前CliSessionState新建立SessionState

    if(! oproc.process_stage2(ss)) {
      System.exit(2);
    }

    // set all properties specified via command line
    HiveConf conf = ss.getConf();//设置所有配置属性
    for(Map.Entry item: ss.cmdProperties.entrySet()) {
      conf.set((String) item.getKey(), (String) item.getValue());
    }

    sp = new SetProcessor();//?? what is proccessor
    qp = new Driver();  // 正常hiveSql的处理引擎
    dfs = new FsShell(ss.getConf());//dfs接口,用于 dfs命令处理

    if(ss.execString != null) {// 输入的是命令行,按命令执行
      System.exit(processLine(ss.execString));
    }

    try {
      if(ss.fileName != null) {// 输入的是文件名,读文件执行
        System.exit(processReader(new BufferedReader(new FileReader(ss.fileName))));
      }
    } catch (FileNotFoundException e) {//没有找到该文件
      System.err.println("Could not open input file for reading. ("+e.getMessage()+")");
      System.exit(3);
    }

    Character mask = null;
    String trigger = null;

    ConsoleReader reader = new ConsoleReader();//hive Console控制台命令读取器
    reader.setBellEnabled(false);
    //reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true)));

    List completors = new LinkedList();
    completors.add(new SimpleCompletor(new String[] { "set", "from",
                                                      "create", "load",
                                                      "describe", "quit", "exit" }));
    reader.addCompletor(new ArgumentCompletor(completors));

    String line;
    PrintWriter out = new PrintWriter(System.out);
    final String HISTORYFILE = ".hivehistory";//建立历史文件,记录所有的命令行
    String historyFile = System.getProperty("user.home") + File.separator  + HISTORYFILE;
    reader.setHistory(new History(new File(historyFile)));
    int ret = 0;
    Log LOG = LogFactory.getLog("CliDriver");//建立日志
    LogHelper console = new LogHelper(LOG);
    String prefix = "";
    String curPrompt = prompt;// -- is "hive"
    //不断地获取hiveSql,读取;之前的所有内容,传个processLine处理
    while ((line = reader.readLine(curPrompt+"> ")) != null) {//--循环开始读命令
      long start = System.currentTimeMillis();// 命令计时开始
      if(line.trim().endsWith(";")) {//如果碰见';'表示结束,该
        line = prefix + " " + line;
        ret = processLine(line);// ----重点: 把命令行传入给解析,执行
        prefix = "";// 把前缀重置为空
        curPrompt = prompt;// "hive"
      } else {
        prefix = prefix + line;
        curPrompt = prompt2;// 应该是 "  "
        continue;
      }
      long end = System.currentTimeMillis();
      if (end > start) {//统计开始到结束的时间,如:命令开始执行所用的时间,
    	                //console reader需要可以添加很多屏幕操作

double timeTaken = (double)(end-start)/1000.0;
        console.printInfo("Time taken: " + timeTaken + " seconds", null);     //对应在Hive Session上。
      }
    }

System.exit(ret);

―――――――――― processLine(Cmd)
// 读入cmd:‘;’之前的所有字符串都读入(不做任何检查),之后的都会忽略。读完后,传入processCmd处理.

  public static int processLine(String line) {
    int ret = 0;
    for(String oneCmd: line.split(";")) {
      oneCmd = oneCmd.trim();
      if(oneCmd.equals(""))
        continue;

      ret = processCmd(oneCmd);//--执行命令
      if(ret != 0) {
        // ignore anything after the first failed command
        return ret;
      }
    }
    return 0;
  }

―――――――――― processCmd()
//- 读入cmd,并分情况处理,总共分为以下五种情况,根据命令的开头字符串来确定用什么方法处理。
// 1.set.. 设置operator参数,hive环境参数
// 2.quit or exit ― 退出Hive环境
// 3.! 开头
// 4.dfs 开头 交给FsShell处理
// 5.hivesql 正常hivesql执行语句,我们最关心的是这里。语句交给了、、Hive真正的核心引

public static int processCmd(String cmd) {
    String[] tokens = cmd.split("\\s+");
    String cmd_1 = cmd.substring(tokens[0].length());
    int ret = 0;

    if(tokens[0].equals("set")) { //1
      ret = sp.run(cmd_1);// 调用这句就可以更改hadoop配置
    } else if (cmd.equals("quit") || cmd.equals("exit")) {//2
      //退出Hive环境
      System.exit(0);
    } else if (cmd.startsWith("!")) {//3 :! 开头的命令
      SessionState ss = SessionState.get();
      String shell_cmd = cmd.substring(1);
      if (shell_cmd.endsWith(";")) {
        shell_cmd = shell_cmd.substring(0, shell_cmd.length()-1);
      }//--除掉';'??
      //shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";

      try {
        Process executor = Runtime.getRuntime().exec(shell_cmd);//!!??这句得好好 跟踪
        StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, ss.out);
        StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, ss.err);

        outPrinter.start();
        errPrinter.start();

        int exitVal = executor.waitFor();//?? look executor
        if (exitVal != 0) {
          ss.err.write((new String("Command failed with exit code = " + exitVal)).getBytes());
        }
      }
      catch (Exception e) {
        e.printStackTrace();
      }
    } else if (cmd.startsWith("dfs")) {//4  "dfs"  开头解析方法  -- cmd.
      // Hadoop DFS 操作接口 处理!
      SessionState ss = SessionState.get();
      if(dfs == null)
        dfs = new FsShell(ss.getConf());
      String hadoopCmd = cmd.replaceFirst("dfs\\s+", "");
      hadoopCmd = hadoopCmd.trim();
      if (hadoopCmd.endsWith(";")) {
        hadoopCmd = hadoopCmd.substring(0, hadoopCmd.length()-1);
      }
      String[] args = hadoopCmd.split("\\s+");//
      try {
        PrintStream oldOut = System.out;
        System.setOut(ss.out);
        int val = dfs.run(args);//??
        System.setOut(oldOut);
        if (val != 0) {
          ss.err.write((new String("Command failed with exit code = " + val)).getBytes());
        }
      } catch (Exception e) {
        ss.err.println("Exception raised from DFSShell.run " + e.getLocalizedMessage());
      }
    } else {//5 hivesql 正常运行,重点在这里
      ret = qp.run(cmd);//正常执行hive命令,如:select  .. ;  addfile ..;
      Vector res = new Vector();
      while (qp.getResults(res)) {//获得执行结果result
      	for (String r:res) {
          SessionState ss  = SessionState.get();
          PrintStream out = ss.out;
          out.println(r);
      	}
        res.clear();
      }

      int cret = qp.close();
      if (ret == 0) {
        ret = cret;
      }
    }
    return ret;
  }

―――――――-类CliSessionState

CliSessionState 除了做了个初始化,基本都是上继承了SessionState的实现方法,可能是作者为了低耦合。
public class CliSessionState extends SessionState

原图已失效
所以我们直接看SessionState。
SessionState可以说你是你自己当前的Hive环境,建立、初始化你Hive的session,一方面它来自conf的初始化设置,一方面来自你手动set。可以通过命令行形式,也可以通过file,这都取决于你的选择。
它会连接Hive元数据数据库,得到现有的元数据信息。
此类主要关键功能:
1 主要是生成session,并赋予一个唯一id(设置规则:用户名_年月日分秒 即 user_id + “_”+ yyyymmddHHmm)的session,
生成session有两种方式:1.直接新建session ,2. 通过拷贝方式复制一个session。
第一种我们最常用,但第二种很有用,我们可以确保我们两个环境是完全一致的,而且避免琐碎设置工作。
2 给每个cmd给予一个queryID,可以通过queryID得到命令行,也可以反过来得到id
3 每个sessionState 都会有一个logHelper,用于日志记录
其中 clude : hiveconf , 连接db元数据数据库

下图是SessionState的类结构:
原图已失效

关键函数:
String makeSessionId() ――― //生成sessionID : user_id+”_” + yyyyMMDDhhmm
setCmd(String cmdString)―― //给命令cmd设置query Id
protected final static HiveConf.ConfVars [] metaVars ―-//获取元数据系统,路径等
public String getCmd() ―― // -通过queryID获取命令代码cmd

――――――――――-类 CommandProcessor
CommandProcessor类的很简单,是个接口类。

public interface CommandProcessor {
public int run(String command);
}

你会奇怪为什么先聊这个接口类,因为有三个类实现了这个接口(如下图),其中setProcessor, MetadataProcessor是我们Hive入口的关键类。

原图已失效

―――-类MetadataProcessor
Hive部分元信息提取与处理
// run()中 得到表的元信息,如果出错返回1,如:找不到表名等情况

public int run(String command) {
    SessionState ss = SessionState.get();
    String table_name = command.trim();
    if(table_name.equals("")) {
      return 0;
    }

    try {
      MetaStoreClient msc = new MetaStoreClient(ss.getConf());

      if(!msc.tableExists(table_name)) {//表不存在
        ss.err.println("table does not exist: " + table_name);
        return 1;
      } else {
        List fields = msc.get_fields(table_name);//获得表信息

        for(FieldSchema f: fields) {
          ss.out.println(f.getName() + ": " + f.getType());
        }
      }
    } catch (MetaException err) {
      ss.err.println("Got meta exception: " + err.getMessage());
      return 1;
    } catch (Exception err) {
      ss.err.println("Got exception: " + err.getMessage());
      return 1;
    }
    return 0;
  }

――――――- 类SetProcessor
主要是设置Hive环境,总共分为两大类:
1. set session为安全模式,
如:set silent = true;
2.set 该session的conf配置,即调用hadoop时的配置参数,以及改变执行时的具体实现。 如:set hive.exec.compress.output=’false’;
// 我们可以调用这里run(String command)更改hadoop配置 ,hive执行参数等,

public int run(String command) {
    SessionState ss = SessionState.get();//建一个SessionState对象
    String nwcmd = command.trim();//去空格
    if(nwcmd.equals("")) {
      dumpOptions(ss.getConf().getChangedProperties());
      return 0;
    }
    if(nwcmd.equals("-v")) {
      dumpOptions(ss.getConf().getAllProperties());
      return 0;
    }
    String[] part = new String [2];
    int eqIndex = nwcmd.indexOf('=');
    if(eqIndex == -1) {
      // no equality sign - print the property out
      dumpOption(ss.getConf().getAllProperties(), nwcmd);
      return (0);
    } else if (eqIndex == nwcmd.length()-1) {
      part[0] = nwcmd.substring(0, nwcmd.length()-1);
      part[1] = "";
    } else {
      part[0] = nwcmd.substring(0, eqIndex);// 中间=号隔开的 set cmd
      part[1] = nwcmd.substring(eqIndex+1);
    }

    try {//
      if (part[0].equals("silent")) {// 设置silent模式
        boolean val = getBoolean(part[1]);//
        ss.setIsSilent(val);//
      } else {
        ss.getConf().set(part[0], part[1]);// 设置key - value(如:.gmmt = ture)修改该session的conf里配置
      }

Hive的入口先到这里,以后会陆续更新,欢迎交流。希望能这个Hive源码分析系列能够在你探索Hive的路上,节省您的时间。谢谢~

同分类推荐文章

  1. 使用deepseek进行Oracle恢复,引起重大故障 (2026-06-22 10:56:00)
  2. 接手一个只差临门一脚的数据库恢复 (2026-06-18 00:13:09)
  3. 我做了一个 AI 版的 StarRocks 升级风险扫描工具,直接帮我定位到一个风险 (2026-06-15 01:00:00)

查看更多 数据库 文章 →

建议继续学习

  1. HFile存储格式 (累计阅读 15,977)
  2. hbase介绍 (累计阅读 12,368)
  3. Zookeeper工作原理 (累计阅读 12,203)
  4. Facebook的实时Hadoop系统 (累计阅读 11,495)
  5. 海量数据面试题举例 (累计阅读 11,114)
  6. redis在大数据量下的压测表现 (累计阅读 8,295)
  7. HBase技术介绍 (累计阅读 8,077)
  8. 淘宝数据魔方技术架构解析 (累计阅读 7,956)
  9. HBase随机写以及随机读性能测试 (累计阅读 7,549)
  10. 如何获取hive建表语句 (累计阅读 7,184)