IT技术博客大学习 共学习 共进步
全部 移动开发 后端 数据库 AI 算法 安全 DevOps 前端 设计 开发者

大量小文件的实时同步方案

懒人运维 2010-05-04 21:22:42 累计浏览 4,591 次
本机暂存

传统的文件同步方案有rsync(单向) 和 unison(双向)等,它们需要扫描所有文件后进行比对,差量传输。如果文件数量达到了百万甚至千万量级,扫描所有文件将非常耗时。而且正在发生变化的往往是其中很少的一部分,这是非常低效的方式。

之前看了Amazon的Dynamo的设计文档,它们每个节点的数据是通过Hash Tree来实现同步,既有通过日志来同步的软实时特点(msyql, bdb等),也可以保证最终数据的一致性(rsync, unison等)。Hash Tree的大体思路是将所有数据存储成树状结构,每个节点的Hash是其所有子节点的Hash的Hash,叶子节点的Hash是其内容的Hash。这样一旦某个节点发生变化,其Hash的变化会迅速传播到根节点。需要同步的系统只需要不断查询跟节点的hash,一旦有变化,顺着树状结构就能够在logN级别的时间找到发生变化的内容,马上同步。

文件系统天然的是树状结构,尽管不是平衡的数。如果文件的修改时间是可靠的,可以表征文件的变化,那就可以用它作为文件的Hash值。另一方面,文件的修改通常是按顺序执行的,后修改的文件比早修改的文件具有更大的修改时间,这样就可以把一个目录内的最大修改时间作为它的修改时间,以实现Hash Tree。这样,一旦某个文件被修改,修改时间的信息就会迅速传播到根目录。

一般的文件系统都不是这样做的,目录的修改时间表示的是目录结构最后发生变化的时间,不包括子目录,否则会不堪重负。因为我们需要自己实现这个功能,利用Linux 2.6内核的新特性inotify获得某个目录内文件发生变化的信息,并把其修改时间传播到它的上级目录(以及再上级目录)。Python 有 pyinotify,watch.py的代码如下:

#!/usr/bin/python
 
from pyinotify import *
import os, os.path
 
flags = IN_CLOSE_WRITE|IN_CREATE|IN_Q_OVERFLOW
dirs = {}
base = '/log/lighttpd/cache/images/icon/u241'
base = 'tmp'
 
class UpdateParentDir(ProcessEvent):
    def process_IN_CLOSE_WRITE(self, event):
        print 'modify', event.pathname
        mtime = os.path.getmtime(event.pathname)
        p = event.path
        while p.startswith(base):
            m = os.path.getmtime(p)
            if m < mtime:
        	    print 'update', p
                os.utime(p, (mtime,mtime))
            elif m > mtime:
                mtime = m
            p = os.path.dirname(p)
 
    process_IN_MODIFY = process_IN_CLOSE_WRITE
 
    def process_IN_Q_OVERFLOW(self, event):
        print 'over flow'
        max_queued_events.value *= 2
 
    def process_default(self, event):
    	pass
 
wm = WatchManager()
notifier = Notifier(wm, UpdateParentDir())
dirs.update(wm.add_watch(base, flags, rec=True, auto_add=True))
 
notifier.loop()

在已经有Hash Tree的时候,同步就比较简单了,不停地获取根目录的修改时间并顺着目录结构往下找即可。需要注意的是,在更新完文件后,需要设置修改时间为原文件的修改时间,目录也是,保证Hash Tree的一致性,否则没法同步。mirror.py的代码如下

#!/usr/bin/python
 
import sys,time,re,urllib
import os,os.path
from os.path import exists, isdir, getmtime
 
src = sys.argv[1]
dst = sys.argv[2]
 
def local_mirror(src, dst):
    if exists(dst) and mtime == getmtime(dst):
        return
    if not isdir(src):
        print 'update:', dst
        open(dst,'wb').write(open(src).read())
    else:
        if not exists(dst):
            os.makedirs(dst)
        for filename in os.listdir(src):
            local_mirror(os.path.join(src,filename), os.path.join(dst,filename))
    os.utime(dst, (mtime,mtime))
 
def get_info(path):
    f = urllib.urlopen(path)
    mtime = f.headers.get('Last-Modified')
    if mtime:
        mtime = time.mktime(time.strptime(mtime, '%a, %d %b %Y %H:%M:%S %Z'))
    content = f.read()
    f.close()
    return int(mtime), content
 
p = re.compile(r'([\d.]+?) +([\w/]+)')
 
def remote_mirror(src, dst):
    mtime, content = get_info(src)
    if exists(dst) and mtime == int(getmtime(dst)):
        return
    print 'update:', dst, src
    if not src.endswith('/'):
        open(dst,'wb').write(content)
    else:
        if not exists(dst):
            os.makedirs(dst)
        for mt,filename in p.findall(content):
            mt = int(float(mt))
            lpath = dst+filename
            if not exists(lpath) or int(getmtime(lpath)) != mt:
                remote_mirror(src+filename, lpath)
    os.utime(dst, (mtime,mtime))
 
if src.startswith('http://'):
    mirror = remote_mirror
else:
    mirror = local_mirror
 
while True:
    mirror(src, dst)
    time.sleep(1)

如果源文件不在同一台机器上,可以通过NFS等共享过来。或者可以通过支持列目录的HTTP服务器来访问远程目录,mirror.py 已经支持这种访问方式。server.py 是用webpy做的一个简单的只是列目录的文件服务器。由于瓶颈在IO上,它的性能不是关键。server.py的代码如下:

#!/usr/bin/python
 
import os,os.path
import web
import time
 
root = 'tmp'
 
HTTP_HEADER_TIME = '%a, %d %b %Y %H:%M:%S %Z'
 
class FileServer:
    def GET(self, path):
        path = root + path
        if not os.path.exists(path):
            return 404
        mtime = time.localtime(os.path.getmtime(path))
        web.header('Last-Modified', time.strftime(HTTP_HEADER_TIME, mtime))
        if os.path.isdir(path):
            for file in os.listdir(path):
                if file.startswith('.'): continue
                p = os.path.join(path,file)
                m = os.path.getmtime(p)
                if os.path.isdir(p):
                    file += '/'
                print m, file
        else:
            print open(path,'rb').read()
 
urls = (
   "(/.*)", "FileServer",
)
 
if __name__ == '__main__':
    web.run(urls, globals())

为了获得更好性能,以达到更好的实时性,Hash Tree最好是平衡的,比如BTree。如果一个文件发生变化,同步它需要进行的IO操作为N*M,其中N为数的层数,M为每层的文件数目。现在我们N为2,M最大为10000,适当减少它可以获得更好的性能,比如N为4,M为100。在以后创建目录结构时,最好能够考虑这方面的因素。

之前hongqn推荐过一个利用inotify的文件同步方案,同步方式类似于mysql和bdb等,由于过于复杂导致不可靠而没有采用。上面这个方案只用了一百多行Python代码就基本解决问题了,是不是很帅?:-)

转自:http://blog.daviesliu.net/2008/04/24/sync/

同分类推荐文章

  1. 从零重建 macOS 开发机:可复现的环境初始化流程 (2026-06-14 20:36:00)
  2. 百度物理网络监控工具开源第二弹:毫秒级监控工具 baize,让你的网络问题无处遁形 (2026-06-11 08:10:28)
  3. How to Set Up Homebrew Tap for Private CLI Tools: A Complete Guide (2026-05-27 02:13:03)

查看更多 DevOps 文章 →

建议继续学习

  1. Facebook的实时Hadoop系统 (累计阅读 11,487)
  2. rsync同步的艺术 (累计阅读 9,596)
  3. Linux探索:一次删除一百万个文件的最快方法 (累计阅读 6,858)
  4. rsync 的核心算法 (累计阅读 5,603)
  5. 加速scp传输速度 (累计阅读 5,391)
  6. Dropbox差异同步算法rsync及其改进算法原理 (累计阅读 5,223)
  7. rsync自动输入密码实现数据备份 (累计阅读 5,103)
  8. storm入门教程 第一章 前言 (累计阅读 5,107)
  9. 三款面向 Amazon S3 的开源文件同步工具之对比 (累计阅读 4,201)
  10. puppet使用rsync来同步文件教程 (累计阅读 4,209)