IT技术博客大学习 共学习 共进步

HBase Block Cache实现机制分析

量子恒道官方博客 2012-11-27 13:37:30 浏览 2,485 次

   本文结合HBase 0.94.1版本源码,对HBase的Block Cache实现机制进行分析,总结学习其Cache设计的核心思想。

1. 概述

   HBase上Regionserver的内存分为两个部分,一部分作为Memstore,主要用来写;另外一部分作为BlockCache,主要用于读。

  • 写请求会先写入Memstore,Regionserver会给每个region提供一个Memstore,当Memstore满64MB以后,会启动 flush刷新到磁盘。当Memstore的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。

  • 读请求先到Memstore中查数据,查不到就到BlockCache中查,再查不到就会到磁盘上读,并把读的结果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache达到上限(heapsize * hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。

  •    一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能正常启动。

       默认配置下,BlockCache为0.2,而Memstore为0.4。在注重读响应时间的应用场景下,可以将 BlockCache设置大些,Memstore设置小些,以加大缓存的命中率。

    HBase RegionServer包含三个级别的Block优先级队列:

  • Single:如果一个Block第一次被访问,则放在这一优先级队列中;

  • Multi:如果一个Block被多次访问,则从Single队列移到Multi队列中;

  • InMemory:如果一个Block是inMemory的,则放到这个队列中。

  • 以上将Cache分级思想的好处在于:

  • 首先,通过inMemory类型Cache,可以有选择地将in-memory的column families放到RegionServer内存中,例如Meta元数据信息;

  • 通过区分Single和Multi类型Cache,可以防止由于Scan操作带来的Cache频繁颠簸,将最少使用的Block加入到淘汰算法中。

  • 默认配置下,对于整个BlockCache的内存,又按照以下百分比分配给Single、Multi、InMemory使用:0.25、0.50和0.25。

    注意,其中InMemory队列用于保存HBase Meta表元数据信息,因此如果将数据量很大的用户表设置为InMemory的话,可能会导致Meta表缓存失效,进而对整个集群的性能产生影响。

    2. 源码分析

    下面是对HBase 0.94.1中相关源码(org.apache.hadoop.hbase.io.hfile.LruBlockCache)的分析过程。

    2.1加入Block Cache

     /** Concurrent map (the cache) */

     private final ConcurrentHashMap<BlockCacheKey,CachedBlock> map;

     /**

      * Cache the block with the specified name and buffer.

      * <p>

      * It is assumed this will NEVER be called on an already cached block.  If

      * that is done, an exception will be thrown.

      * @param cacheKey block’s cache key

      * @param buf block buffer

      * @param inMemory if block is in-memory

      */

     public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {

       CachedBlock cb = map.get(cacheKey);

       if(cb != null) {

         throw new RuntimeException(“Cached an already cached block”);

       }

       cb = new CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);

       long newSize = updateSizeMetrics(cb, false);

       map.put(cacheKey, cb);

       elements.incrementAndGet();

       if(newSize > acceptableSize() && !evictionInProgress) {

         runEviction();

       }

     }

     /**

      * Cache the block with the specified name and buffer.

      * <p>

      * It is assumed this will NEVER be called on an already cached block.  If

      * that is done, it is assumed that you are reinserting the same exact

      * block due to a race condition and will update the buffer but not modify

      * the size of the cache.

      * @param cacheKey block’s cache key

      * @param buf block buffer

      */

     public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {

       cacheBlock(cacheKey, buf, false);

     }

    1)  这里假设不会对同一个已经被缓存的BlockCacheKey重复放入cache操作;

    2)  根据inMemory标志创建不同类别的CachedBlock对象:若inMemory为true则创建BlockPriority.MEMORY类型,否则创建BlockPriority.SINGLE;注意,这里只有这两种类型的Cache,因为BlockPriority.MULTI在Cache Block被重复访问时才进行创建,见CachedBlock的access方法代码:

     /**

      * Block has been accessed.  Update its local access time.

      */

     public void access(long accessTime) {

       this.accessTime = accessTime;

       if(this.priority == BlockPriority.SINGLE) {

         this.priority = BlockPriority.MULTI;

       }

     }

    3)  将BlockCacheKey和创建的CachedBlock对象加入到全局的ConcurrentHashMap map中,同时做一些更新计数操作;

    4)  最后判断如果加入后的Block Size大于设定的临界值且当前没有淘汰线程运行,则调用runEviction()方法启动LRU淘汰过程:

     /** Eviction thread */

     private final EvictionThread evictionThread;

     /**

      * Multi-threaded call to run the eviction process.

      */

     private void runEviction() {

       if(evictionThread == null) {

         evict();

       } else {

         evictionThread.evict();

       }

     }

    其中,EvictionThread线程即是LRU淘汰的具体实现线程。下面将给出详细分析。

    2.2淘汰Block Cache

    EvictionThread线程主要用于与主线程的同步,从而完成Block Cache的LRU淘汰过程。

     /*

      * Eviction thread.  Sits in waiting state until an eviction is triggered

      * when the cache size grows above the acceptable level.<p>

      *

      * Thread is triggered into action by {@link LruBlockCache#runEviction()}

      */

     private static class EvictionThread extends HasThread {

       private WeakReference<LruBlockCache> cache;

       private boolean go = true;

       public EvictionThread(LruBlockCache cache) {

         super(Thread.currentThread().getName() + “.LruBlockCache.EvictionThread”);

         setDaemon(true);

         this.cache = new WeakReference<LruBlockCache>(cache);

       }

       @Override

       public void run() {

         while (this.go) {

           synchronized(this) {

             try {

               this.wait();

             } catch(InterruptedException e) {}

           }

           LruBlockCache cache = this.cache.get();

           if(cache == null) break;

           cache.evict();

         }

       }

       public void evict() {

         synchronized(this) {

           this.notify(); // FindBugs NN_NAKED_NOTIFY

         }

       }

       void shutdown() {

         this.go = false;

         interrupt();

       }

     }

    EvictionThread线程启动后,调用wait被阻塞住,直到EvictionThread线程的evict方法被主线程调用时执行notify(见上面的代码分析过程,通过主线程的runEviction方法触发调用),开始执行LruBlockCache的evict方法进行真正的淘汰过程,代码如下:

     /**

      * Eviction method.

      */

     void evict() {

       // Ensure only one eviction at a time

       if(!evictionLock.tryLock()) return;

       try {

         evictionInProgress = true;

         long currentSize = this.size.get();

         long bytesToFree = currentSize - minSize();

         if (LOG.isDebugEnabled()) {

           LOG.debug(“Block cache LRU eviction started; Attempting to free ” +

             StringUtils.byteDesc(bytesToFree) + ” of total=” +

             StringUtils.byteDesc(currentSize));

         }

         if(bytesToFree <= 0) return;

         // Instantiate priority buckets

         BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,

             singleSize());

         BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,

             multiSize());

         BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,

             memorySize());

         // Scan entire map putting into appropriate buckets

         for(CachedBlock cachedBlock : map.values()) {

           switch(cachedBlock.getPriority()) {

             case SINGLE: {

               bucketSingle.add(cachedBlock);

               break;

             }

             case MULTI: {

               bucketMulti.add(cachedBlock);

               break;

             }

             case MEMORY: {

               bucketMemory.add(cachedBlock);

               break;

             }

           }

         }

         PriorityQueue<BlockBucket> bucketQueue =

           new PriorityQueue<BlockBucket>(3);

         bucketQueue.add(bucketSingle);

         bucketQueue.add(bucketMulti);

         bucketQueue.add(bucketMemory);

         int remainingBuckets = 3;

         long bytesFreed = 0;

         BlockBucket bucket;

         while((bucket = bucketQueue.poll()) != null) {

           long overflow = bucket.overflow();

           if(overflow > 0) {

             long bucketBytesToFree = Math.min(overflow,

               (bytesToFree - bytesFreed) / remainingBuckets);

             bytesFreed += bucket.free(bucketBytesToFree);

           }

           remainingBuckets-;

         }

         if (LOG.isDebugEnabled()) {

           long single = bucketSingle.totalSize();

           long multi = bucketMulti.totalSize();

           long memory = bucketMemory.totalSize();

           LOG.debug(“Block cache LRU eviction completed; ” +

             “freed=” + StringUtils.byteDesc(bytesFreed) + “, ” +

             “total=” + StringUtils.byteDesc(this.size.get()) + “, ” +

             “single=” + StringUtils.byteDesc(single) + “, ” +

             “multi=” + StringUtils.byteDesc(multi) + “, ” +

             “memory=” + StringUtils.byteDesc(memory));

         }

       } finally {

         stats.evict();

         evictionInProgress = false;

         evictionLock.unlock();

       }

     }

    1)首先获取锁,保证同一时刻只有一个淘汰线程运行;

    2)计算得到当前Block Cache总大小currentSize及需要被淘汰释放掉的大小bytesToFree,如果bytesToFree小于等于0则不进行后续操作;

    3) 初始化创建三个BlockBucket队列,分别用于存放Single、Multi和InMemory类Block Cache,其中每个BlockBucket维护了一个CachedBlockQueue,按LRU淘汰算法维护该BlockBucket中的所有CachedBlock对象;

    4) 遍历记录所有Block Cache的全局ConcurrentHashMap,加入到相应的BlockBucket队列中;

    5) 将以上三个BlockBucket队列加入到一个优先级队列中,按照各个BlockBucket超出bucketSize的大小顺序排序(见BlockBucket的compareTo方法);

    6) 遍历优先级队列,对于每个BlockBucket,通过Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets)计算出需要释放的空间大小,这样做可以保证尽可能平均地从三个BlockBucket中释放指定的空间;具体实现过程详见BlockBucket的free方法,从其CachedBlockQueue中取出即将被淘汰掉的CachedBlock对象:

       public long free(long toFree) {

         CachedBlock cb;

         long freedBytes = 0;

         while ((cb = queue.pollLast()) != null) {

           freedBytes += evictBlock(cb);

           if (freedBytes >= toFree) {

             return freedBytes;

           }

         }

         return freedBytes;

       }

    7) 进一步调用了LruBlockCache的evictBlock方法,从全局ConcurrentHashMap中移除该CachedBlock对象,同时更新相关计数:

     protected long evictBlock(CachedBlock block) {

       map.remove(block.getCacheKey());

       updateSizeMetrics(block, true);

       elements.decrementAndGet();

       stats.evicted();

       return block.heapSize();

     }

    8) 释放锁,完成善后工作。

    3. 总结

    以上关于Block Cache的实现机制,核心思想是将Cache分级,这样的好处是避免Cache之间相互影响,尤其是对HBase来说像Meta表这样的Cache应该保证高优先级。

建议继续学习

  1. HBase集群出现NotServingRegionException问题的排查及解决方法 (阅读 17,124)
  2. HFile存储格式 (阅读 15,822)
  3. hbase运维 (阅读 14,784)
  4. hbase介绍 (阅读 12,225)
  5. HBase技术介绍 (阅读 7,943)
  6. HBase随机写以及随机读性能测试 (阅读 7,425)
  7. HBase性能优化方法总结 (阅读 6,946)
  8. HBase二级索引与Join (阅读 6,862)
  9. HBase Thrift 接口使用注意事项 (阅读 6,603)
  10. Cassandra和HBase主要设计思路对比 (阅读 4,946)