技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 源码分析 --> HBase Block Cache实现机制分析

HBase Block Cache实现机制分析

浏览:1742次  出处信息

   本文结合HBase 0.94.1版本源码,对HBase的Block Cache实现机制进行分析,总结学习其Cache设计的核心思想。

1. 概述

   HBase上Regionserver的内存分为两个部分,一部分作为Memstore,主要用来写;另外一部分作为BlockCache,主要用于读。

  • 写请求会先写入Memstore,Regionserver会给每个region提供一个Memstore,当Memstore满64MB以后,会启动 flush刷新到磁盘。当Memstore的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。

  • 读请求先到Memstore中查数据,查不到就到BlockCache中查,再查不到就会到磁盘上读,并把读的结果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache达到上限(heapsize * hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。

  •    一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能正常启动。

       默认配置下,BlockCache为0.2,而Memstore为0.4。在注重读响应时间的应用场景下,可以将 BlockCache设置大些,Memstore设置小些,以加大缓存的命中率。

    HBase RegionServer包含三个级别的Block优先级队列:

  • Single:如果一个Block第一次被访问,则放在这一优先级队列中;

  • Multi:如果一个Block被多次访问,则从Single队列移到Multi队列中;

  • InMemory:如果一个Block是inMemory的,则放到这个队列中。

  • 以上将Cache分级思想的好处在于:

  • 首先,通过inMemory类型Cache,可以有选择地将in-memory的column families放到RegionServer内存中,例如Meta元数据信息;

  • 通过区分Single和Multi类型Cache,可以防止由于Scan操作带来的Cache频繁颠簸,将最少使用的Block加入到淘汰算法中。

  • 默认配置下,对于整个BlockCache的内存,又按照以下百分比分配给Single、Multi、InMemory使用:0.25、0.50和0.25。

    注意,其中InMemory队列用于保存HBase Meta表元数据信息,因此如果将数据量很大的用户表设置为InMemory的话,可能会导致Meta表缓存失效,进而对整个集群的性能产生影响。

    2. 源码分析

    下面是对HBase 0.94.1中相关源码(org.apache.hadoop.hbase.io.hfile.LruBlockCache)的分析过程。

    2.1加入Block Cache

     /** Concurrent map (the cache) */

     private final ConcurrentHashMap<BlockCacheKey,CachedBlock> map;

     /**

      * Cache the block with the specified name and buffer.

      * <p>

      * It is assumed this will NEVER be called on an already cached block.  If

      * that is done, an exception will be thrown.

      * @param cacheKey block’s cache key

      * @param buf block buffer

      * @param inMemory if block is in-memory

      */

     public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {

       CachedBlock cb = map.get(cacheKey);

       if(cb != null) {

         throw new RuntimeException(“Cached an already cached block”);

       }

       cb = new CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);

       long newSize = updateSizeMetrics(cb, false);

       map.put(cacheKey, cb);

       elements.incrementAndGet();

       if(newSize > acceptableSize() && !evictionInProgress) {

         runEviction();

       }

     }

     /**

      * Cache the block with the specified name and buffer.

      * <p>

      * It is assumed this will NEVER be called on an already cached block.  If

      * that is done, it is assumed that you are reinserting the same exact

      * block due to a race condition and will update the buffer but not modify

      * the size of the cache.

      * @param cacheKey block’s cache key

      * @param buf block buffer

      */

     public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {

       cacheBlock(cacheKey, buf, false);

     }

    1)  这里假设不会对同一个已经被缓存的BlockCacheKey重复放入cache操作;

    2)  根据inMemory标志创建不同类别的CachedBlock对象:若inMemory为true则创建BlockPriority.MEMORY类型,否则创建BlockPriority.SINGLE;注意,这里只有这两种类型的Cache,因为BlockPriority.MULTI在Cache Block被重复访问时才进行创建,见CachedBlock的access方法代码:

     /**

      * Block has been accessed.  Update its local access time.

      */

     public void access(long accessTime) {

       this.accessTime = accessTime;

       if(this.priority == BlockPriority.SINGLE) {

         this.priority = BlockPriority.MULTI;

       }

     }

    3)  将BlockCacheKey和创建的CachedBlock对象加入到全局的ConcurrentHashMap map中,同时做一些更新计数操作;

    4)  最后判断如果加入后的Block Size大于设定的临界值且当前没有淘汰线程运行,则调用runEviction()方法启动LRU淘汰过程:

     /** Eviction thread */

     private final EvictionThread evictionThread;

     /**

      * Multi-threaded call to run the eviction process.

      */

     private void runEviction() {

       if(evictionThread == null) {

         evict();

       } else {

         evictionThread.evict();

       }

     }

    其中,EvictionThread线程即是LRU淘汰的具体实现线程。下面将给出详细分析。

    2.2淘汰Block Cache

    EvictionThread线程主要用于与主线程的同步,从而完成Block Cache的LRU淘汰过程。

     /*

      * Eviction thread.  Sits in waiting state until an eviction is triggered

      * when the cache size grows above the acceptable level.<p>

      *

      * Thread is triggered into action by {@link LruBlockCache#runEviction()}

      */

     private static class EvictionThread extends HasThread {

       private WeakReference<LruBlockCache> cache;

       private boolean go = true;

       public EvictionThread(LruBlockCache cache) {

         super(Thread.currentThread().getName() + “.LruBlockCache.EvictionThread”);

         setDaemon(true);

         this.cache = new WeakReference<LruBlockCache>(cache);

       }

       @Override

       public void run() {

         while (this.go) {

           synchronized(this) {

             try {

               this.wait();

             } catch(InterruptedException e) {}

           }

           LruBlockCache cache = this.cache.get();

           if(cache == null) break;

           cache.evict();

         }

       }

       public void evict() {

         synchronized(this) {

           this.notify(); // FindBugs NN_NAKED_NOTIFY

         }

       }

       void shutdown() {

         this.go = false;

         interrupt();

       }

     }

    EvictionThread线程启动后,调用wait被阻塞住,直到EvictionThread线程的evict方法被主线程调用时执行notify(见上面的代码分析过程,通过主线程的runEviction方法触发调用),开始执行LruBlockCache的evict方法进行真正的淘汰过程,代码如下:

     /**

      * Eviction method.

      */

     void evict() {

       // Ensure only one eviction at a time

       if(!evictionLock.tryLock()) return;

       try {

         evictionInProgress = true;

         long currentSize = this.size.get();

         long bytesToFree = currentSize - minSize();

         if (LOG.isDebugEnabled()) {

           LOG.debug(“Block cache LRU eviction started; Attempting to free ” +

             StringUtils.byteDesc(bytesToFree) + ” of total=” +

             StringUtils.byteDesc(currentSize));

         }

         if(bytesToFree <= 0) return;

         // Instantiate priority buckets

         BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,

             singleSize());

         BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,

             multiSize());

         BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,

             memorySize());

         // Scan entire map putting into appropriate buckets

         for(CachedBlock cachedBlock : map.values()) {

           switch(cachedBlock.getPriority()) {

             case SINGLE: {

               bucketSingle.add(cachedBlock);

               break;

             }

             case MULTI: {

               bucketMulti.add(cachedBlock);

               break;

             }

             case MEMORY: {

               bucketMemory.add(cachedBlock);

               break;

             }

           }

         }

         PriorityQueue<BlockBucket> bucketQueue =

           new PriorityQueue<BlockBucket>(3);

         bucketQueue.add(bucketSingle);

         bucketQueue.add(bucketMulti);

         bucketQueue.add(bucketMemory);

         int remainingBuckets = 3;

         long bytesFreed = 0;

         BlockBucket bucket;

         while((bucket = bucketQueue.poll()) != null) {

           long overflow = bucket.overflow();

           if(overflow > 0) {

             long bucketBytesToFree = Math.min(overflow,

               (bytesToFree - bytesFreed) / remainingBuckets);

             bytesFreed += bucket.free(bucketBytesToFree);

           }

           remainingBuckets-;

         }

         if (LOG.isDebugEnabled()) {

           long single = bucketSingle.totalSize();

           long multi = bucketMulti.totalSize();

           long memory = bucketMemory.totalSize();

           LOG.debug(“Block cache LRU eviction completed; ” +

             “freed=” + StringUtils.byteDesc(bytesFreed) + “, ” +

             “total=” + StringUtils.byteDesc(this.size.get()) + “, ” +

             “single=” + StringUtils.byteDesc(single) + “, ” +

             “multi=” + StringUtils.byteDesc(multi) + “, ” +

             “memory=” + StringUtils.byteDesc(memory));

         }

       } finally {

         stats.evict();

         evictionInProgress = false;

         evictionLock.unlock();

       }

     }

    1)首先获取锁,保证同一时刻只有一个淘汰线程运行;

    2)计算得到当前Block Cache总大小currentSize及需要被淘汰释放掉的大小bytesToFree,如果bytesToFree小于等于0则不进行后续操作;

    3) 初始化创建三个BlockBucket队列,分别用于存放Single、Multi和InMemory类Block Cache,其中每个BlockBucket维护了一个CachedBlockQueue,按LRU淘汰算法维护该BlockBucket中的所有CachedBlock对象;

    4) 遍历记录所有Block Cache的全局ConcurrentHashMap,加入到相应的BlockBucket队列中;

    5) 将以上三个BlockBucket队列加入到一个优先级队列中,按照各个BlockBucket超出bucketSize的大小顺序排序(见BlockBucket的compareTo方法);

    6) 遍历优先级队列,对于每个BlockBucket,通过Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets)计算出需要释放的空间大小,这样做可以保证尽可能平均地从三个BlockBucket中释放指定的空间;具体实现过程详见BlockBucket的free方法,从其CachedBlockQueue中取出即将被淘汰掉的CachedBlock对象:

       public long free(long toFree) {

         CachedBlock cb;

         long freedBytes = 0;

         while ((cb = queue.pollLast()) != null) {

           freedBytes += evictBlock(cb);

           if (freedBytes >= toFree) {

             return freedBytes;

           }

         }

         return freedBytes;

       }

    7) 进一步调用了LruBlockCache的evictBlock方法,从全局ConcurrentHashMap中移除该CachedBlock对象,同时更新相关计数:

     protected long evictBlock(CachedBlock block) {

       map.remove(block.getCacheKey());

       updateSizeMetrics(block, true);

       elements.decrementAndGet();

       stats.evicted();

       return block.heapSize();

     }

    8) 释放锁,完成善后工作。

    3. 总结

    以上关于Block Cache的实现机制,核心思想是将Cache分级,这样的好处是避免Cache之间相互影响,尤其是对HBase来说像Meta表这样的Cache应该保证高优先级。

建议继续学习:

  1. HBase集群出现NotServingRegionException问题的排查及解决方法    (阅读:16090)
  2. HFile存储格式    (阅读:14410)
  3. hbase运维    (阅读:13603)
  4. hbase介绍    (阅读:10836)
  5. HBase技术介绍    (阅读:6577)
  6. HBase随机写以及随机读性能测试    (阅读:6408)
  7. HBase性能优化方法总结    (阅读:5726)
  8. HBase二级索引与Join    (阅读:5629)
  9. HBase Thrift 接口使用注意事项    (阅读:5278)
  10. Cassandra和HBase主要设计思路对比    (阅读:4070)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2024 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1