From d612ec2b64244e8a41c406ae4a232cbe566cad13 Mon Sep 17 00:00:00 2001 From: Akumatic Date: Wed, 3 May 2023 11:49:08 +0200 Subject: [PATCH] Add python scripts to update charset of database, tables and columns --- README.md | 31 +++- convert.py | 58 ++++++ convert/__init__.py | 0 convert/utf8mb4converter.py | 345 ++++++++++++++++++++++++++++++++++++ 4 files changed, 433 insertions(+), 1 deletion(-) create mode 100644 convert.py create mode 100644 convert/__init__.py create mode 100644 convert/utf8mb4converter.py diff --git a/README.md b/README.md index a1521e5..de29d3b 100644 --- a/README.md +++ b/README.md @@ -1 +1,30 @@ -# db-UTF8-MB3-to-MB4 \ No newline at end of file +# db-UTF8-MB3-to-MB4 + +MySQL has used utf8 as an alias for utf8mb3 in previous versions. Since only three bytes are used per character, not the entire Unicode character set is covered. Also, utf8mb3 has been deprecated since MySQL Version 8.0 and will be removed in future versions. + +To convert an existing database, the character sets of the following components must be changed: + +- the database +- all tables +- all columns with a character set + +This script converts the character set of a given database - by default to **utf8mb4** with collation **utf8mb4_unicode_520_ci**. + +If an error occurs during the conversion of a table or column, an output with the corresponding SQL command is issued and the program continues. + +## Usage + +``` +python convert.py [-h] [-v] -H HOST -P PORT -u USER -p PASSWORD -d DATABASE +``` +Required arguments: +- `-H/--host HOST` +- `-P/--port PORT` +- `-u/--user USER` +- `-p/--password PASSWORD` +- `-d/--database DATABASE` + +Optional arguments: +- `-h/--help` +- `-v/--verbose` + diff --git a/convert.py b/convert.py new file mode 100644 index 0000000..fec51cd --- /dev/null +++ b/convert.py @@ -0,0 +1,58 @@ +# SPDX-License-Identifier: MIT +# Copyright (c) 2023 Akumatic + +import logging +import argparse +from convert.utf8mb4converter import UTF8MB4Converter + +def main ( + args: argparse.Namespace + ) -> None: + """ + Main program sequence. Establishes a connection to the database, converts + the default charset and collation of the database itself, all tables and + all text fields to UTF8MB4 if its not in UTF8MB4 yet. + + Params: + - args (argparse.Namespace) + - Contains arguments passed to the program + """ + + db: UTF8MB4Converter = UTF8MB4Converter ( + user = args.user, + password = args.password, + host = args.host, + port = args.port, + db = args.database + ) + db.convert_charset_db() + db.convert_charset_all_tables() + db.convert_charset_all_columns_all_tables() + +def parse_args ( + ) -> argparse.Namespace: + """ + Parses the arguments passed to the program. + + Returns: + - An argparse namespace containing the parsed arguments + """ + + argparser: argparse.ArgumentParser = argparse.ArgumentParser() + args_opt: argparse._ArgumentGroup = argparser.add_argument_group("Optional Arguments") + args_req: argparse._ArgumentGroup = argparser.add_argument_group("Required Arguments") + + args_opt.add_argument("-v", "--verbose", action="store_true") + + args_req.add_argument("-H", "--host", required=True) + args_req.add_argument("-P", "--port", required=True, type=int) + args_req.add_argument("-u", "--user", required=True) + args_req.add_argument("-p", "--password", required=True) + args_req.add_argument("-d", "--database", required=True) + + return argparser.parse_args() + +if __name__ == "__main__": + args = parse_args() + logging.basicConfig(level=(logging.DEBUG if args.verbose else logging.INFO)) + main(args) \ No newline at end of file diff --git a/convert/__init__.py b/convert/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/convert/utf8mb4converter.py b/convert/utf8mb4converter.py new file mode 100644 index 0000000..695b58f --- /dev/null +++ b/convert/utf8mb4converter.py @@ -0,0 +1,345 @@ +# SPDX-License-Identifier: MIT +# Copyright (c) 2023 Akumatic + +import mariadb +import logging +import sys + +DEFAULT_CHARSET = "utf8mb4" +DEFAULT_COLLATION = "utf8mb4_unicode_520_ci" + +class UTF8MB4Converter: + """ + Class to establish a database connection and execute SQL queries to change the default character set and collation. + + Attributes: + - db (str) + - Stores database name of database connection + - host (str) + - Stores host of database connection + - port (int) + - Stores port of database connection + - logger (logging.Logger) + - Logger object for this file + - _connection (mariadb.Connection) + - database connection + - cur (mariadb.Cursor) + - database cursor + """ + + def __init__( + self, + user: str, + password: str, + host: str, + port:int, + db: str + ) -> None: + """ + Constructor of UTF8MB4Converter object. Establishes a connection to the given database and stores the cursor. + Exits the program if connection fails. + """ + + self.db: str = db + self.host: str = host + self.port: int = port + self.logger: logging.Logger = logging.getLogger(__name__) + self.logger.info(f"Connecting to {db}@{host}:{port} as {user}") + + try: + self._connection: mariadb.Connection = mariadb.connect( + user = user, + password = password, + host = host, + port = port, + database = db + ) + self.logger.info("Connection established") + except mariadb.Error as e: + self.logger.fatal(f"Connection failed: {e}") + sys.exit() + + self.cur: mariadb.Cursor = self._connection.cursor() + + def __del__( + self + ) -> None: + """ + Destructor of UTF8MB4Converter object. Closes the established connection. + """ + + self.logger.info(f"Connection to {self.db}@{self.host}:{self.port} closed") + self._connection.close() + + def get_tables ( + self + ) -> list: + """ + Fetches all tables of the given database. + + Returns: + - List of strings containing names of all tables + """ + + query = f"SHOW TABLES FROM {self.db}" + self.cur.execute(query) + return [table[0] for table in self.cur.fetchall()] + + def get_charset_db ( + self + ) -> dict: + """ + Fetches the character set and collation of the database. + + Returns: + - A dict with the keys charset and collation, containing the collation + and character set of the stored database. + """ + + query = " ".join(( + "SELECT default_character_set_name, default_collation_name", + f"FROM information_schema.SCHEMATA WHERE schema_name = '{self.db}'" + )) + self.cur.execute(query) + return dict(zip(("charset", "collation"), self.cur.fetchone())) + + def get_charset_table ( + self, + table: str + ) -> dict: + """ + Fetches the character set and collation of a given table + + Parameters: + - table (str) + - the table whose character set is to be changed + + Returns: + - A dict with the keys "charset" and "collation", containing the + collation and character set of the given table. + """ + + query = " ".join(( + "SELECT CCSA.character_set_name, table_collation", + "FROM information_schema.TABLES AS T", + "JOIN information_schema.COLLATION_CHARACTER_SET_APPLICABILITY AS CCSA", + "WHERE T.table_collation = CCSA.collation_name", + f"AND table_schema = '{self.db}' AND table_name = '{table}'" + )) + self.cur.execute(query) + return dict(zip(("charset", "collation"), self.cur.fetchone())) + + def get_columns_of_table ( + self, + table: str + ) -> list: + """ + Fetches information about the columns of a given table. + + Parameters: + - table (str) + - the tables whose columns are to be retrieved + + Returns: + - A list of dicts, containing the column information. Each dict has + the keys "name" (Column Name), "type" (Data Type), "ctype" (Column Type), + "charset", "collation", "nullable" and "dvalue" (Column Default) + """ + + query = " ".join(( + "SELECT COLUMN_NAME AS name, DATA_TYPE AS type, COLUMN_TYPE AS ctype,", + "CHARACTER_SET_NAME AS charset, COLLATION_NAME as collation,", + "IS_NULLABLE AS nullable, COLUMN_DEFAULT AS dvalue", + "FROM information_schema.COLUMNS", + f"WHERE table_schema = '{self.db}' AND table_name = '{table}'" + )) + self.cur.execute(query) + lables = ["name", "type", "ctype", "charset", "collation", "nullable", "dvalue"] + return [dict(zip(lables, col)) for col in self.cur.fetchall()] + + def convert_charset_db ( + self, + charset: str = DEFAULT_CHARSET, + collation: str = DEFAULT_COLLATION + ) -> None: + """ + Alters the charset and collation of the database + + Parameters: + - charset (str) + - target character set + - default value: utf8mb4 + - collation (str) + - target collation + - default value: utf8mb4_unicode_520_ci + """ + + self.logger.debug(f"Start converting character set of database {self.db} to {charset}") + db_info = self.get_charset_db() + + if db_info["charset"] == charset: + self.logger.debug(f"Database {self.db} already has character set {charset}") + return + + query = f"ALTER DATABASE {self.db} CHARACTER SET = {charset} COLLATE = {collation}" + try: + self.cur.execute(query) + self.logger.debug(f"Character set of database {self.db} successfully converted to {charset}") + except mariadb.Error as e: + self.logger.error("\n".join(( + f"Failed to convert character set of database {self.db}: {e}", + f"-> Query causing the problem: {query}" + ))) + + def convert_charset_single_table ( + self, + table: str, + charset: str = DEFAULT_CHARSET, + collation: str = DEFAULT_COLLATION + ) -> None: + """ + Alters the charset and collation of a single table + + Parameters: + - table (str) + - the table to be altered + - charset (str) + - target character set + - default value: utf8mb4 + - collation (str) + - target collation + - default value: utf8mb4_unicode_520_ci + """ + + self.logger.debug(f"Start converting character set of table {table} to {charset}") + table_info = self.get_charset_table(table) + + if table_info["charset"] == charset: + self.logger.debug(f"Table {table} already has character set {charset}") + return + + query = f"ALTER TABLE {table} CONVERT TO CHARACTER SET {charset} COLLATE {collation}" + try: + self.cur.execute(query) + self.logger.debug(f"Character set of table {table} successfully converted to {charset}") + except mariadb.Error as e: + self.logger.error("\n".join(( + f"Failed to convert character set of table {table}: {e}", + f"-> Query causing the problem: {query}" + ))) + + def convert_charset_all_tables ( + self, + charset: str = DEFAULT_CHARSET, + collation: str = DEFAULT_COLLATION + ) -> None: + """ + Alters the charset and collation of all tables of the database. + + Parameters: + - charset (str) + - target character set + - default value: utf8mb4 + - collation (str) + - target collation + - default value: utf8mb4_unicode_520_ci + """ + + tables = self.get_tables() + for table in tables: + self.convert_charset_single_table(table, charset, collation) + + def convert_charset_single_column ( + self, + column: dict, + table: str, + charset: str = DEFAULT_CHARSET, + collation: str = DEFAULT_COLLATION + ) -> None: + """ + Alters the charset and collation of a single column. + + Parameters: + - column (dict) + - a dict containig information about the column to be altered + - table (str) + - the table housing the column + - charset (str) + - target character set + - default value: utf8mb4 + - collation (str) + - target collation + - default value: utf8mb4_unicode_520_ci + """ + + col = column["name"] + self.logger.debug(f"Start converting character set of column {col}(@{table}) to {charset}") + + if not column["charset"]: + self.logger.debug(f"Column {col}(@{table}) has no set default character") + return + if not column["type"] in ["char", "varchar", "text", "longtext"]: + self.logger.debug(f"Column {col}(@{table}) does contain data of type {column['type']}") + return + if column["charset"] == charset: + self.logger.debug(f"Column {col}(@{table}) already has character set {charset}") + return + + query = " ".join(( + f"ALTER TABLE {table} CHANGE {col} {col}", + f"{column['ctype']} CHARACTER SET {charset} COLLATE {collation}", + "NULL" if column["nullable"] == "YES" else f"NOT NULL DEFAULT {column['dvalue']}" + )) + try: + self.cur.execute(query) + self.logger.debug(f"Character set of column {col}(@{table}) successfully converted to {charset}") + except mariadb.Error as e: + self.logger.error("\n".join(( + f"Failed to convert character set of column {col}(@{table}): {e}", + f"-> Query causing the problem: {query}" + ))) + + def convert_charset_all_columns_single_table ( + self, + table: str, + charset: str = DEFAULT_CHARSET, + collation: str = DEFAULT_COLLATION + ) -> None: + """ + Alters the charset and collation of all columns of a table. + + Parameters: + - table (str) + - the table containing the columns + - charset (str) + - target character set + - default value: utf8mb4 + - collation (str) + - target collation + - default value: utf8mb4_unicode_520_ci + """ + + columns = self.get_columns_of_table(table) + for column in columns: + self.convert_charset_single_column(column, table, charset, collation) + + def convert_charset_all_columns_all_tables ( + self, + charset: str = DEFAULT_CHARSET, + collation: str = DEFAULT_COLLATION + ) -> None: + """ + Alters the charset and collation of all columns of all tables of a database. + + Parameters: + - charset (str) + - target character set + - default value: utf8mb4 + - collation (str) + - target collation + - default value: utf8mb4_unicode_520_ci + """ + + tables = self.get_tables() + for table in tables: + self.convert_charset_all_columns_single_table(table, charset, collation) \ No newline at end of file