databases/scripts/pImport.py

124 lines
4.0 KiB
Python
Raw Permalink Normal View History

2017-03-29 14:14:18 +02:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__version__ = "0.1"
from optparse import OptionParser
import sys, os, commands, yaml, re, threading, Queue
class Log:
"""Simple class for logging"""
def __init__(self, verbose):
self.verbose = verbose
def log(self, line):
"""Logs an especified line"""
if self.verbose:
sys.stderr.write (" - " + str(line) + "\n")
class ImportConfig:
"""Class to handle config"""
def get_config(self):
path = os.path.dirname(os.path.abspath(__file__))
documents = open(path + '/pImportConfig.yml', 'r').read()
data = yaml.load(documents)
return data
class ImportFile:
"""Class to handle SQL files"""
def __init__(self, log, mysqluser, mysqlpass, mysqlhost, mysqlport=3306, mysqloptions=""):
self.user = mysqluser
self.password = mysqlpass
self.host = mysqlhost
self.port = mysqlport
self.options = mysqloptions
self.log = log
def cli(self, database, file, cli="/usr/bin/mysql"):
cmd=cli
if self.options != "":
cmd = cmd + " " + self.options
cmd = cmd + " -h" + self.host + " -P" + self.port + " -u" + self.user + " -p" + self.password + " " + database + " < " + file
#print cmd
os.system(cmd)
return (None, None)
def get_files(self, path):
"""Return all files in path"""
files = []
for element in sorted(os.listdir(path)):
if os.path.isfile(path + '/' + element) and element.endswith('.sql'):
files.append(element)
return files
class Worker(threading.Thread):
def __init__(self, queue, log, cli, event_dict, ):
threading.Thread.__init__(self)
self.queue = queue
self.log = log
self.cli = cli
self.event_dict = event_dict
def run(self):
self.log.log("Worker " + self.getName() + " started")
while True:
try:
num, database, file = self.queue.get(True, 1)
except Queue.Empty:
break
self.event_dict[num] = threading.Event()
self.event_dict[num].clear()
self.log.log(self.getName() + " importing " + database + " " + file)
status, output = self.cli.cli(database, file)
self.log.log(self.getName() + " import " + database + " " + file)
if output:
print output
self.event_dict[num].set()
def main():
usage = "usage: %prog [options]\n Run mysql import in paralel"
parser = OptionParser(usage, version=__version__)
parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False, help="verbose output.")
parser.add_option("-t", "--threads", action="store", dest="threads", type="int", default=4, help="Threads used. Default = 4")
parser.add_option("-n", "--name", action="store", dest="name", type="string", default=False, help="Name/Type of backup dir")
(options, args) = parser.parse_args()
log = Log(options.verbose)
queue = Queue.Queue()
x = 0
print "=== Start import with " + str(options.threads) + " threads ==="
importconfig = ImportConfig()
config = importconfig.get_config()
path = config['dir'] + '/' + options.name
cli = ImportFile(log, config['user'], config['pass'], config['host'], config['port'], config['options'])
for file in cli.get_files(path):
match = re.match('([^\s_]+)\.(.*)\.sql\Z', file)
database = match.group(1)
queue.put([x, database, path + "/" + file])
x = x + 1
event_dict = {}
threads = []
x = 0
for i in range(options.threads):
threads.append(Worker(queue, log, cli, event_dict))
threads[x].setDaemon(True)
threads[x].start()
x = x + 1
# Wait for all threads to finish
for thread in threads:
thread.join()
print "=== End of import ==="
if __name__ == "__main__":
main()