nkjnm
Dependencies: MAX44000 nexpaq_mdk
Fork of LED_Demo by
Diff: mbd_os/tools/test_mysql.py
- Revision:
- 1:55a6170b404f
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mbd_os/tools/test_mysql.py Sat Sep 17 16:32:05 2016 +0000 @@ -0,0 +1,271 @@ +""" +mbed SDK +Copyright (c) 2011-2014 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com> +""" + +import re +import MySQLdb as mdb + +# Imports from TEST API +from tools.test_db import BaseDBAccess + + +class MySQLDBAccess(BaseDBAccess): + """ Wrapper for MySQL DB access for common test suite interface + """ + def __init__(self): + BaseDBAccess.__init__(self) + self.DB_TYPE = 'mysql' + + def detect_database(self, verbose=False): + """ detect database and return VERION data structure or string (verbose=True) + """ + query = 'SHOW VARIABLES LIKE "%version%"' + rows = self.select_all(query) + if verbose: + result = [] + for row in rows: + result.append("\t%s: %s"% (row['Variable_name'], row['Value'])) + result = "\n".join(result) + else: + result = rows + return result + + def parse_db_connection_string(self, str): + """ Parsing SQL DB connection string. String should contain: + - DB Name, user name, password, URL (DB host), name + Function should return tuple with parsed (host, user, passwd, db) or None if error + E.g. connection string: 'mysql://username:password@127.0.0.1/db_name' + """ + result = BaseDBAccess().parse_db_connection_string(str) + if result is not None: + (db_type, username, password, host, db_name) = result + if db_type != 'mysql': + result = None + return result + + def is_connected(self): + """ Returns True if we are connected to database + """ + return self.db_object is not None + + def connect(self, host, user, passwd, db): + """ Connects to DB and returns DB object + """ + try: + self.db_object = mdb.connect(host=host, user=user, passwd=passwd, db=db) + # Let's remember connection credentials + self.db_type = self.DB_TYPE + self.host = host + self.user = user + self.passwd = passwd + self.db = db + except mdb.Error, e: + print "Error %d: %s"% (e.args[0], e.args[1]) + self.db_object = None + self.db_type = None + self.host = None + self.user = None + self.passwd = None + self.db = None + + def connect_url(self, db_url): + """ Connects to database using db_url (database url parsing), + store host, username, password, db_name + """ + result = self.parse_db_connection_string(db_url) + if result is not None: + (db_type, username, password, host, db_name) = result + if db_type == self.DB_TYPE: + self.connect(host, username, password, db_name) + + def reconnect(self): + """ Reconnects to DB and returns DB object using stored host name, + database name and credentials (user name and password) + """ + self.connect(self.host, self.user, self.passwd, self.db) + + def disconnect(self): + """ Close DB connection + """ + if self.db_object: + self.db_object.close() + self.db_object = None + self.db_type = None + + def escape_string(self, str): + """ Escapes string so it can be put in SQL query between quotes + """ + con = self.db_object + result = con.escape_string(str) + return result if result else '' + + def select_all(self, query): + """ Execute SELECT query and get all results + """ + con = self.db_object + cur = con.cursor(mdb.cursors.DictCursor) + cur.execute(query) + rows = cur.fetchall() + return rows + + def insert(self, query, commit=True): + """ Execute INSERT query, define if you want to commit + """ + con = self.db_object + cur = con.cursor() + cur.execute(query) + if commit: + con.commit() + return cur.lastrowid + + def get_next_build_id(self, name, desc='', location='', type=None, status=None): + """ Insert new build_id (DB unique build like ID number to send all test results) + """ + if status is None: + status = self.BUILD_ID_STATUS_STARTED + + if type is None: + type = self.BUILD_ID_TYPE_TEST + + query = """INSERT INTO `%s` (%s_name, %s_desc, %s_location, %s_type_fk, %s_status_fk) + VALUES ('%s', '%s', '%s', %d, %d)"""% (self.TABLE_BUILD_ID, + self.TABLE_BUILD_ID, + self.TABLE_BUILD_ID, + self.TABLE_BUILD_ID, + self.TABLE_BUILD_ID, + self.TABLE_BUILD_ID, + self.escape_string(name), + self.escape_string(desc), + self.escape_string(location), + type, + status) + index = self.insert(query) # Provide inserted record PK + return index + + def get_table_entry_pk(self, table, column, value, update_db=True): + """ Checks for entries in tables with two columns (<TABLE_NAME>_pk, <column>) + If update_db is True updates table entry if value in specified column doesn't exist + """ + # TODO: table buffering + result = None + table_pk = '%s_pk'% table + query = """SELECT `%s` + FROM `%s` + WHERE `%s`='%s'"""% (table_pk, + table, + column, + self.escape_string(value)) + rows = self.select_all(query) + if len(rows) == 1: + result = rows[0][table_pk] + elif len(rows) == 0 and update_db: + # Update DB with new value + result = self.update_table_entry(table, column, value) + return result + + def update_table_entry(self, table, column, value): + """ Updates table entry if value in specified column doesn't exist + Locks table to perform atomic read + update + """ + result = None + con = self.db_object + cur = con.cursor() + cur.execute("LOCK TABLES `%s` WRITE"% table) + table_pk = '%s_pk'% table + query = """SELECT `%s` + FROM `%s` + WHERE `%s`='%s'"""% (table_pk, + table, + column, + self.escape_string(value)) + cur.execute(query) + rows = cur.fetchall() + if len(rows) == 0: + query = """INSERT INTO `%s` (%s) + VALUES ('%s')"""% (table, + column, + self.escape_string(value)) + cur.execute(query) + result = cur.lastrowid + con.commit() + cur.execute("UNLOCK TABLES") + return result + + def update_build_id_info(self, build_id, **kw): + """ Update additional data inside build_id table + Examples: + db.update_build_id_info(build_id, _status_fk=self.BUILD_ID_STATUS_COMPLETED, _shuffle_seed=0.0123456789): + """ + if len(kw): + con = self.db_object + cur = con.cursor() + # Prepare UPDATE query + # ["`mtest_build_id_pk`=[value-1]", "`mtest_build_id_name`=[value-2]", "`mtest_build_id_desc`=[value-3]"] + set_list = [] + for col_sufix in kw: + assign_str = "`%s%s`='%s'"% (self.TABLE_BUILD_ID, col_sufix, self.escape_string(str(kw[col_sufix]))) + set_list.append(assign_str) + set_str = ', '.join(set_list) + query = """UPDATE `%s` + SET %s + WHERE `mtest_build_id_pk`=%d"""% (self.TABLE_BUILD_ID, + set_str, + build_id) + cur.execute(query) + con.commit() + + def insert_test_entry(self, build_id, target, toolchain, test_type, test_id, test_result, test_output, test_time, test_timeout, test_loop, test_extra=''): + """ Inserts test result entry to database. All checks regarding existing + toolchain names in DB are performed. + If some data is missing DB will be updated + """ + # Get all table FK and if entry is new try to insert new value + target_fk = self.get_table_entry_pk(self.TABLE_TARGET, self.TABLE_TARGET + '_name', target) + toolchain_fk = self.get_table_entry_pk(self.TABLE_TOOLCHAIN, self.TABLE_TOOLCHAIN + '_name', toolchain) + test_type_fk = self.get_table_entry_pk(self.TABLE_TEST_TYPE, self.TABLE_TEST_TYPE + '_name', test_type) + test_id_fk = self.get_table_entry_pk(self.TABLE_TEST_ID, self.TABLE_TEST_ID + '_name', test_id) + test_result_fk = self.get_table_entry_pk(self.TABLE_TEST_RESULT, self.TABLE_TEST_RESULT + '_name', test_result) + + con = self.db_object + cur = con.cursor() + + query = """ INSERT INTO `%s` (`mtest_build_id_fk`, + `mtest_target_fk`, + `mtest_toolchain_fk`, + `mtest_test_type_fk`, + `mtest_test_id_fk`, + `mtest_test_result_fk`, + `mtest_test_output`, + `mtest_test_time`, + `mtest_test_timeout`, + `mtest_test_loop_no`, + `mtest_test_result_extra`) + VALUES (%d, %d, %d, %d, %d, %d, '%s', %.2f, %.2f, %d, '%s')"""% (self.TABLE_TEST_ENTRY, + build_id, + target_fk, + toolchain_fk, + test_type_fk, + test_id_fk, + test_result_fk, + self.escape_string(test_output), + test_time, + test_timeout, + test_loop, + self.escape_string(test_extra)) + cur.execute(query) + con.commit()