#!/usr/bin/env python # -*- coding: utf-8 -*- __version__ = "0.1" from optparse import OptionParser import sys, os, commands, yaml, re, threading, Queue class Log: """Simple class for logging""" def __init__(self, verbose): self.verbose = verbose def log(self, line): """Logs an especified line""" if self.verbose: sys.stderr.write (" - " + str(line) + "\n") class ImportConfig: """Class to handle config""" def get_config(self): path = os.path.dirname(os.path.abspath(__file__)) documents = open(path + '/pImportConfig.yml', 'r').read() data = yaml.load(documents) return data class ImportFile: """Class to handle SQL files""" def __init__(self, log, mysqluser, mysqlpass, mysqlhost, mysqlport=3306, mysqloptions=""): self.user = mysqluser self.password = mysqlpass self.host = mysqlhost self.port = mysqlport self.options = mysqloptions self.log = log def cli(self, database, file, cli="/usr/bin/mysql"): cmd=cli if self.options != "": cmd = cmd + " " + self.options cmd = cmd + " -h" + self.host + " -P" + self.port + " -u" + self.user + " -p" + self.password + " " + database + " < " + file #print cmd os.system(cmd) return (None, None) def get_files(self, path): """Return all files in path""" files = [] for element in sorted(os.listdir(path)): if os.path.isfile(path + '/' + element) and element.endswith('.sql'): files.append(element) return files class Worker(threading.Thread): def __init__(self, queue, log, cli, event_dict, ): threading.Thread.__init__(self) self.queue = queue self.log = log self.cli = cli self.event_dict = event_dict def run(self): self.log.log("Worker " + self.getName() + " started") while True: try: num, database, file = self.queue.get(True, 1) except Queue.Empty: break self.event_dict[num] = threading.Event() self.event_dict[num].clear() self.log.log(self.getName() + " importing " + database + " " + file) status, output = self.cli.cli(database, file) self.log.log(self.getName() + " import " + database + " " + file) if output: print output self.event_dict[num].set() def main(): usage = "usage: %prog [options]\n Run mysql import in paralel" parser = OptionParser(usage, version=__version__) parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False, help="verbose output.") parser.add_option("-t", "--threads", action="store", dest="threads", type="int", default=4, help="Threads used. Default = 4") parser.add_option("-n", "--name", action="store", dest="name", type="string", default=False, help="Name/Type of backup dir") (options, args) = parser.parse_args() log = Log(options.verbose) queue = Queue.Queue() x = 0 print "=== Start import with " + str(options.threads) + " threads ===" importconfig = ImportConfig() config = importconfig.get_config() path = config['dir'] + '/' + options.name cli = ImportFile(log, config['user'], config['pass'], config['host'], config['port'], config['options']) for file in cli.get_files(path): match = re.match('([^\s_]+)\.(.*)\.sql\Z', file) database = match.group(1) queue.put([x, database, path + "/" + file]) x = x + 1 event_dict = {} threads = [] x = 0 for i in range(options.threads): threads.append(Worker(queue, log, cli, event_dict)) threads[x].setDaemon(True) threads[x].start() x = x + 1 # Wait for all threads to finish for thread in threads: thread.join() print "=== End of import ===" if __name__ == "__main__": main()