#!/usr/bin/env python2.4 import sys reload (sys) # enables the setencoding sys.setdefaultencoding('utf-8') import os import socket socket.setdefaulttimeout(3600) from ftplib import FTP import Queue import threading import time import gzip import cPickle #from pyPgSQL import PgSQL import psycopg2 class FsimDBPathException(Exception): pass class FsimDB(threading.Thread): def __init__(self, host, dbq): threading.Thread.__init__(self) self.host = host self.ip = socket.gethostbyname(host) self.dbq = dbq print time.strftime("%H:%M:%S"), "FsimDB: pgsql connect" self.db = psycopg2.connect("dbname=fsim user=fsim host=localhost password=fsim") self.db.set_isolation_level(0) print time.strftime("%H:%M:%S"), "FsimDB: pgsql cursor" self.curs = self.db.cursor() print time.strftime("%H:%M:%S"), "FsimDB: update host and spider" spiderid, hostid = self.updateHostAndSpider(self.host, self.ip) self.spiderid = spiderid self.hostid = hostid #self.findRootId() self.curs.execute("COMMIT;") self.__die = False self.running = True self.curs.execute("COMMIT;") def die(self): self.__die = True def shutdown(self): self.running = False def cursor(self): return self.db.cursor() def getHostId(self, host): self.curs.execute("SELECT id FROM host WHERE host = '%s'" % (host)) retval = self.curs.fetchone() if not retval: return False return int(retval[0]) def insertSpider(self): self.curs.execute("INSERT INTO spider (udate) VALUES (now())") self.curs.execute("SELECT currval('spider_id_seq')") retval = self.curs.fetchone() if not retval: return False return int(retval[0]) def insertHost(self, hostname, ip, spiderid): query = "INSERT INTO host (hostname, host, insert_date, last_spider_id) VALUES" query += " ('%s', '%s', now(), %i)" % (hostname, ip, spiderid) print time.strftime("%H:%M:%S"), "inserting host" self.curs.execute(query) print time.strftime("%H:%M:%S"), "inserting done" def updateHost(self, hostid, spiderid): query = "UPDATE host SET last_spider_id=%(spider_id)s WHERE id=%(host_id)s" self.curs.execute(query, dict(spider_id=spiderid, host_id=hostid)) def createRootDir(self,hostid): query = "INSERT INTO path (host_id, directory) VALUES (%(host_id)s,%(dir_id)s)" self.curs.execute(query, dict(host_id=hostid,dir_id="/")) def updateHostAndSpider(self, hostname, ip): spiderid = self.insertSpider() print 'spiderid:', spiderid hostid = self.getHostId(hostname) if not hostid: print time.strftime("%H:%M:%S"), "insert host" self.insertHost(hostname, ip, spiderid) print time.strftime("%H:%M:%S"), "select host" hostid = self.getHostId(ip) print "got host:",hostid self.createRootDir(hostid) else: print time.strftime("%H:%M:%S"), "update host" self.updateHost(hostid, spiderid) return spiderid, hostid def findRoot(self,path): path2 = "/".join(path) if not path2: path2="/" try: path2 = path2.encode("utf-8","replace"); except: return None; query = "SELECT id FROM path WHERE host_id=%(host_id)s AND directory=%(dir)s" self.curs.execute(query, dict(host_id=self.hostid,dir=path2)) rootid = self.curs.fetchone() if not rootid: raise FsimDBPathException, "Invalid path %s" % path2 return rootid[0] def updateDir(self,rootid,path2,name): path = "/".join(path2) if name: path = path+"/"+name if not path: path = "/"+name try: path = path.encode('utf-8','replace'); except: print "Encode error in directory: ",path; return; #path = path.encode('utf-8','ignore') query = "Select id from path where host_id=%(host_id)s and directory=%(dir)s" self.curs.execute(query, dict(host_id=self.hostid,dir=path)) result = self.curs.fetchone() if result: return query = "INSERT INTO path (host_id, directory,parent_id) VALUES (%(host_id)s,%(dir)s,%(parent_id)s)" #print time.strftime("%H:%M:%S"), "updateDir SQL:",query self.curs.execute(query, dict(host_id=self.hostid,dir=path,parent_id=rootid)) def updateFile(self, rootid, name, size): #print time.strftime("%H:%M:%S"), "updateFile file:", rootid, name, size, "-" try: name = name.encode('utf-8','replace'); except: print "Encode error in name: ",name ; return; #name.encode('utf-8','ignore'); query = "select id from file where path_id=%(path_id)s and name=%(name)s and size=%(size)s"; self.curs.execute(query, dict(path_id=rootid,name=name,size=long(size))); result = self.curs.fetchone(); if result: return; query = "INSERT INTO file (path_id, name, size, insert_date) VALUES (%(path_id)s, %(name)s, %(size)s, now())"; self.curs.execute(query, dict(path_id=int(rootid),name=name,size=long(size))); query = "select id from file where name=%(name)s and size=%(size)s and path_id=%(path_id)s"; self.curs.execute(query, dict(name=name,size=long(size),path_id=int(rootid))); new_file_id = self.curs.fetchone(); # Search for mirrors query = "select id from file where name=%(name)s and size=%(size)s and path_id!=%(path_id)s"; self.curs.execute(query, dict(name=name,size=long(size),path_id=int(rootid))); mirror_id = self.curs.fetchone(); if not mirror_id: # no same file found.. do nothing return; # get the new id if not new_file_id: print "ERROR: no file id found of new file"; # test if we need to add ourself query = "select id from file_mirror where first_file_id=%(first_id)s"; self.curs.execute(query, dict(first_id=int(mirror_id[0])) ); result = self.curs.fetchone(); if not result: print "Adding mirror record"; query = "insert into file_mirror (file_id,first_file_id) values (%(file_id)s,%(first_id)s)"; self.curs.execute(query, dict(file_id=int(mirror_id[0]),first_id=int(mirror_id[0]))); query = "insert into file_mirror (file_id,first_file_id) values (%(file_id)s,%(first_id)s)"; self.curs.execute(query, dict(file_id=int(new_file_id[0]),first_id=int(mirror_id[0]))); return; def process(self, hit): type, path, name, size = hit; #print time.strftime("%H:%M:%S"), "hit:",hit rootid = self.findRoot(path); if type=="DIR": # hit is directory self.updateDir(rootid,path,name); else: # hit is file rootid = self.findRoot(path); if not rootid: return; self.updateFile(rootid, name, size); def runner(self): c=0 while (not self.__die): hit = None try: hit = self.dbq.get(timeout=0.2) except Queue.Empty, e: # queue is empty #print time.strftime("%H:%M:%S"), "FsimDB: Queue empty waiting" if (not self.running or self.__die): # queue is empty and we are done running the crawl break else: self.process(hit) if c<32: c+=1 else: print time.strftime("%H:%M:%S"), "FsimDB:", self.dbq.qsize(), hit c=0 #self.curs.execute("COMMIT;") def run(self): print time.strftime("%H:%M:%S"), "FsimDB: run" try: self.runner() except KeyboardInterrupt, err: print time.strftime("%H:%M:%S"), "KeyboardInterrupt:", str(err) self.dbthread.die() self.curs.execute("COMMIT;") self.db.close() class cpickle_test: def __init__(self, ip): self.ip = ip self.dbq = Queue.Queue() print "loading map" f = ((os.path.isfile(ip) and file(ip)) or gzip.open(ip+".pickle")) self.fmap = cPickle.load(f) self.dbthread = FsimDB(self.ip, self.dbq) def crawl(self): c=0 while (self.fmap): if (not self.dbthread.isAlive()): break path, lines = self.fmap.popitem() print time.strftime("%H:%M:%S"), "FsimCrawler path: %s" % path pre, post = os.path.split(path) pre = pre.split("/") if not pre[0]=="": pre.insert(0, "") # FIXME: dirty hack to ensure all directories exists for i in range(len(pre)): self.dbq.put(["DIR", pre[:i], pre[i], 0]) # insert directory self.dbq.put(["DIR", pre, post, 0]) for line in lines: #print len(pathlines), path, line fullmode, links, owner, group, size, month, day, hour, name = line size = int(size) #isdir = fullmode[0]=='d' isdir = 0 islink = fullmode[0]=='l' #mode = fullmode[1:] if (c<256): c+=1 else: print time.strftime("%H:%M:%S"), "FsimCrawler:", len(self.fmap) c=0 if name=="." or name=="..": continue #if (isdir): # # record directory and scanit # print time.strftime("%H:%M:%S"), "FsimCrawler path: %s/%s" % ("/".join(path), name) # self.dbq.put(["DIR", path, name, size]) if (islink): # ignoring links FIXME later pass else: # is file #print time.strftime("%H:%M:%S"), "found file:", "/".join(path), name self.dbq.put(["FILE", pre+[post], name, size]) def run(self): print time.strftime("%H:%M:%S"), "FsimCrawl: run" # start test self.dbthread.start() try: self.crawl() except KeyboardInterrupt, err: self.dbthread.die() print time.strftime("%H:%M:%S"), "KeyboardInterrupt:", str(err) except Exception, e: self.dbthread.die() print time.strftime("%H:%M:%S"), "Exception:", e raise # done scanning ftp print time.strftime("%H:%M:%S"), "calling dbthread shutdown" self.dbthread.shutdown() while (self.dbthread.isAlive()): print time.strftime("%H:%M:%S"), "waiting on db thread" try: time.sleep(5) except Exception, e: print time.strftime("%H:%M:%S"), "Exception:",e self.dbthread.die() print time.strftime("%H:%M:%S"), "joining dbthread" self.dbthread.join(timeout=5) print time.strftime("%H:%M:%S"), "done" class ftplib_test: def __init__(self, host): self.host = host self.dbq = Queue.Queue() self.dbthread = FsimDB(self.host, self.dbq) print time.strftime("%H:%M:%S"), "FsimCrawl: ftp instance" self.ftp = FTP(host) print time.strftime("%H:%M:%S"), "FsimCrawl: login" self.ftp.login() def list_dir(self, path, dir): assert dir newpath = list(path)+list([dir]) lines = [] fulldir = '/'.join(newpath) self.ftp.cwd(fulldir) #self.ftp.retrlines("LIST %s" % fulldir, lines.append) self.ftp.retrlines("LIST", lines.append) if newpath==['/']: newpath=[""] return (newpath, lines) def crawl(self): pathlines = [] toscan = [] pathlines.append(self.list_dir([], "/")) c = 0 fc = 0 while (pathlines): if not self.dbthread.isAlive(): print time.strftime("%H:%M:%S"), "dbthread is dead ! exiting now!" break path, lines = pathlines.pop(0) for line in lines: #print len(pathlines), path, line try: fullmode, links, owner, group, size, rest = line.split(None, 5) except ValueError, e: if "total" in line: continue print time.strftime("%H:%M:%S"), "FsimCrawl: ftp error:", e print time.strftime("%H:%M:%S"), " in line:", line continue try: size = int(size) except ValueError, e: # trying to include badly behaving ftp servers # that only have one field for ownership rest = size+" "+rest size = int(group) isdir = fullmode[0]=='d' islink = fullmode[0]=='l' mode = fullmode[1:] date = rest[:12] name = rest[13:] if (c<256): c+=1 else: print time.strftime("%H:%M:%S"), "FsimCrawler:", len(pathlines) c=0 if (fc<2048): fc+=1 else: # make sure ftp doesn't timeout print time.strftime("%H:%M:%S"), "ftp anti-timeout" fc = 0 self.ftp.cwd("/") if name=="." or name=="..": continue if (isdir): self.dbq.put(["DIR", path, name, size]) print time.strftime("%H:%M:%S"), "FsimCrawler scan: %s/%s" % ("/".join(path), name) try: toscan.append(self.list_dir(path, name)) except: print "Dir List Error"; elif (islink): # ignoring links FIXME later pass else: # is file self.dbq.put(["FILE", path, name, size]) #endfor for todo in toscan: #print time.strftime("%H:%M:%S"), "FsimCrawler scan: %s/%s" % ("/".join(path), todo) #pathlines.append(self.list_dir(path, todo)) pathlines.append(todo) toscan = [] def run(self): print time.strftime("%H:%M:%S"), "FsimCrawl: run" # start test self.dbthread.start() try: self.crawl() except KeyboardInterrupt, err: self.dbthread.die() print time.strftime("%H:%M:%S"), "KeyboardInterrupt:", str(err) except Exception, e: self.dbthread.die() print time.strftime("%H:%M:%S"), "Exception:", e raise # done scanning ftp print time.strftime("%H:%M:%S"), "calling dbthread shutdown" self.dbthread.shutdown() while (self.dbthread.isAlive()): print time.strftime("%H:%M:%S"), "waiting on db thread" try: time.sleep(5) except Exception, e: print time.strftime("%H:%M:%S"), "Exception:",e self.dbthread.die() print time.strftime("%H:%M:%S"), "joining dbthread" self.dbthread.join(timeout=5) print time.strftime("%H:%M:%S"), "done" if __name__=="__main__": # feed ip's only at this time if (len(sys.argv)!=3): print "usage: %s [crawl][file] ip_address" % sys.argv[0] sys.exit() if sys.argv[1]=="crawl": ftplib_test(str(sys.argv[2])).run() elif sys.argv[1]=="file": cpickle_test(str(sys.argv[2])).run() else: print "invalid command line."