#!/usr/bin/python import os,sys,time,re,threading,MySQLdb now=time.strftime('%Y%m%d') log_02='/tmp/tongbu_stats_%s.csv' % (now) dict_cmd={ 'test1':'/usr/bin/rsync -tzrlp --stats /tmp/test1 /tmp/test2010', 'test2':'/usr/bin/rsync -tzrlp --stats /tmp/test1 /tmp/test2010', #'test3':'/usr/bin/rsync -tzrlp --stats /tmp/test1 /tmp/test2010', } def write_title(log_stat): file_log=open(log_stat,'a') file_log.write('start_time,end_time,module,Number of files,Number of files transferred,Total file size(bytes),Total transferred file size(bytes)\n') file_log.close() def write_notes(cmd,log_stat,nam): fle=open(log_stat,'a') # db_conn=MySQLdb.connect(host='127.0.0.1',user='root',passwd='',db='test') # db_curs=db_conn.cursor() try: s_time=time.strftime('%H:%M:%S') cmd_put=os.popen(cmd) list_tmp=[] re_p=re.compile('(Number of files:|Number of files transferred:|Total file size:|Total transferred file size:) (\d+)?') for each_line1 in cmd_put: for each_line2 in re_p.finditer(each_line1): if each_line2 is not None: list_tmp.append(each_line2.group(2)) else: pass e_time=time.strftime('%H:%M:%S') cmd_stat='%s,%s,%s,%s,%s,%s,%s,%s' % (s_time,e_time,nam,list_tmp[0],list_tmp[1],list_tmp[2],list_tmp[3],os.linesep) cmd_put.close() fle.write(cmd_stat) db_curs.execute("INSERT INTO bak_stat \ (date_ymd,s_time,e_time,mod_name,files,files_traned,filesize,filesize_traned,a_error) VALUES('%s','%s','%s','%s','%s','%s','%s','%s','')" \ % (now,s_time,e_time,nam,list_tmp[0],list_tmp[1],list_tmp[2],list_tmp[3]) ) except: cmd_stat='%s,%s,%s,%s,%s,%s' % (s_time,e_time,nam,'error','error','error','error'+os.linesep) fle.write(cmd_stat) db_curs.execute("INSERT INTO bak_stat (date_ymd,s_time,e_time,mod_name,a_error) VALUES('%s','%s','%s','%s','error')" % (now,s_time,e_time,nam) ) fle.close() # db_curs.close() # db_conn.close() if __name__ == '__main__': write_title(log_02) db_conn=MySQLdb.connect(host='127.0.0.1',user='root',passwd='',db='test') db_curs=db_conn.cursor() threads = {} for i in dict_cmd: t=threading.Thread(target=write_notes,args=(dict_cmd,log_02,i)) # print 'exec',i threads=t # print threads.items() for i in dict_cmd: threads.start() time.sleep(2) for i in dict_cmd: threads.join() db_curs.close() db_conn.close()
部分代码解释:
#!/usr/bin/python import os,sys,time,re,threading,MySQLdb now=time.strftime('%Y%m%d') log_02='/tmp/tongbu_stats_%s.csv' % (now) dict_cmd={ 'test1':'/usr/bin/rsync -tzrlp --stats /tmp/test1 /tmp/test2010', 'test2':'/usr/bin/rsync -tzrlp --stats /tmp/test1 /tmp/test2010', #'test3':'/usr/bin/rsync -tzrlp --stats /tmp/test1 /tmp/test2010', } #执行命令也用字典的形式,“键”相当于模块名,用于标识 def write_title(log_stat): file_log=open(log_stat,'a') file_log.write('start_time,end_time,module,Number of files,Number of files transferred,Total file size(bytes),Total transferred file size(bytes)\n') file_log.close() def write_notes(cmd,log_stat,nam): fle=open(log_stat,'a') # db_conn=MySQLdb.connect(host='127.0.0.1',user='root',passwd='',db='test') # db_curs=db_conn.cursor() try: s_time=time.strftime('%H:%M:%S') cmd_put=os.popen(cmd) list_tmp=[] re_p=re.compile('(Number of files:|Number of files transferred:|Total file size:|Total transferred file size:) (\d+)?') #cmd,log_stat,nam为形参,注意区分下面的实参,i #(\d+)?表示,\d+表示任意数字,至少有一个, #特殊符号*,+,?它们可以用于匹配字符串模式出现一次、多次、或未出现的情况。 #星号或称星号操作符匹配它左边那个正则表达式出现零次或零次以上的情况 #加号(+)操作符匹配它左边那个正则表达式模式至少出现一次的情况(它也被称为正闭包操作符) #问号操作符( ? )匹配它左边那个正则表达式模式出现零次或一次的情况 for each_line1 in cmd_put: for each_line2 in re_p.finditer(each_line1): if each_line2 is not None: list_tmp.append(each_line2.group(2)) else: pass #用了两次循环,第一次把命令结果迭代赋给each_line1变量,第二次,调用正则表达式,把匹配值赋给each_line2变量 #finditer(pattern,string[, flags])在字符串中,查找匹配项,返回迭代器;对于每个匹配,该迭代器返回一个匹配对象。 #避免程序抛出错误,增加一个判断。 e_time=time.strftime('%H:%M:%S') cmd_stat='%s,%s,%s,%s,%s,%s,%s,%s' % (s_time,e_time,nam,list_tmp[0],list_tmp[1],list_tmp[2],list_tmp[3],os.linesep) cmd_put.close() fle.write(cmd_stat) db_curs.execute("INSERT INTO bak_stat \ (date_ymd,s_time,e_time,mod_name,files,files_traned,filesize,filesize_traned,a_error) VALUES('%s','%s','%s','%s','%s','%s','%s','%s','')" \ % (now,s_time,e_time,nam,list_tmp[0],list_tmp[1],list_tmp[2],list_tmp[3]) ) except: cmd_stat='%s,%s,%s,%s,%s,%s' % (s_time,e_time,nam,'error','error','error','error'+os.linesep) fle.write(cmd_stat) db_curs.execute("INSERT INTO bak_stat (date_ymd,s_time,e_time,mod_name,a_error) VALUES('%s','%s','%s','%s','error')" % (now,s_time,e_time,nam) ) #try...except,异常处理 fle.close() # db_curs.close() # db_conn.close() if __name__ == '__main__': write_title(log_02) db_conn=MySQLdb.connect(host='127.0.0.1',user='root',passwd='',db='test') db_curs=db_conn.cursor() #避免MySQL连接的wait_timeout超时,统计数据无法记录到数据库中 threads = {} for i in dict_cmd: t=threading.Thread(target=write_notes,args=(dict_cmd[i],log_02,i)) # print 'exec',i threads[i]=t # print threads.items() for i in dict_cmd: threads[i].start() time.sleep(2) for i in dict_cmd: threads[i].join() db_curs.close() db_conn.close() #创建一个Thread 的实例,传给它一个函数target,参数args #threading是多线程 #三个循环,将线程都存在threads{}字典中;依次开始;程序挂起,直到线程结束。
另外给大家放上我写的一个简单的perl程序来控制rsync多线程同步
#!/usr/bin/perl my $path = "/data"; #本地目录 my $ip="172.16.xxx.xxx"; #远程目录 my $maxchild=5; #同时并发的个数 open FILE,"ls $path|"; while() { chomp; my $filename = $_; my $i = 1; while($i<=1){ my $un = `ps -ef |grep rsync|grep -v grep |grep avl|wc -l`; $i =$i+1; if( $un < $maxchild){ system("rsync -avl --size-only $path/$_ $ip:$path &") ; }else{ sleep 5; $i = 1; } } }