如何使用Python将MySQL表数据迁移到MongoDB集合

译文作者:Shameel Ahmed 2021-07-09 18:26:41云计算MongoDB 本文详细的讲解了如何在需要的地方获取MySQL 数据。

[[410477]]

【51CTO.com快译】介绍

MySQL 是一个 RDBMS 平台,它以规范化的方式以表格格式存储数据,而 MongoDB 是一个 NoSQL 数据库,它以无模式的方式将信息存储为按集合分组的文档。数据的表示方式完全不同,因此将 MySQL 表数据迁移到 MongoDB 集合听起来可能是一项艰巨的任务。但是,Python 凭借其强大的连接性和数据处理能力让这一切变得轻而易举。

在本文中,将详细的讲解如何使用简单的 Python 脚本将 MySQL 表数据迁移到 MongoDB 集合所需的步骤。这些脚本是在 Windows 上使用 Python 3.9.5 开发的。但是,它应该适用于任何平台上的任何 Python 3+ 版本一起使用。

步骤 1:安装所需的模块

第一步是安装连接MySQL 和 MongoDB 数据库实例所需的模块。我们将使用mysql.connector连接到 MySQL 数据库。对于 MongoDB,使用pymongo,这是从 Python 连接到 MongoDB 的推荐模块。

如果所需模块尚未安装,请运行以下PIP命令来安装它们。

    pip安装mysql连接器pip安装pymongo

PIP 是 Python 包或模块的包管理器。

步骤2:从MySQL表中读取数据

第一步是从源 MySQL 表中读取数据,并以可用于将数据加载到目标 MongoDB 数据库中的格式进行准备。MongoDB 是一个 NoSQL 数据库,它将数据存储为 JSON 文档,因此最好以 JSON 格式生成源数据。值得一提的是,Python 具有强大的数据处理能力,可以轻松地将数据转换为 JSON 格式。

    importmysql.connectormysqldb=mysql.connector.connect(host="localhost",database="employees",user="root",password="")mycursor=mysqldb.cursor(dictionary=True)mycursor.execute("SELECT*fromcategories;")myresult=mycursor.fetchall()print(myresult)

当脚本在没有任何错误的情况下完成时,输出的结果如下:

    [{"id":4,"name":"Medicine","description":"<p>Medicine<br></p>","created_at":"","updated_at":""},{"id":6,"name":"Food","description":"<p>Food</p>","created_at":"","updated_at":""},{"id":8,"name":"Groceries","description":"<p>Groceries<br></p>","created_at":"","updated_at":""},{"id":9,"name":"Cakes&Bakes","description":"<p>Cakes&Bakes<br></p>","created_at":d"","updated_at":""}]

请注意,输出的是一个 JSON 数组,因为我们将dictionary=True参数传递给了游标。否则,结果将采用列表格式。现在有了 JSON 格式的源数据,就可以迁移到 MongoDB 集合。

步骤3:写入 MongoDB 集合

获得 JSON 格式的源数据后,下一步是将数据插入到 MongoDB 集合中。集合是一组文档,相当于 RDBMS 中表(或关系)。我可以通过调用insert_many()集合类的方法来实现,该方法返回插入文档的对象 ID 列表。请注意,当作为参数传递空列表时,此方法将引发异常,因此在方法调用之前进行长度检查。

    importpymongomongodb_host="mongodb://localhost:27017/"mongodb_dbname="mymongodb"myclient=pymongo.MongoClient(mongodb_host)mydb=myclient[mongodb_dbname]mycol=mydb["categories"]iflen(myresult)>0:x=mycol.insert_many(myresult)#myresultcomesfrommysqlcursorprint(len(x.inserted_ids))

完成此步骤后,检查一下 MongoDB 实例,以验证数据库和集合是否已创建,文档是否已插入。注意MongoDB 是无模式的,这就意味着不必定义模式来插入文档,模式是动态推断并自动创建的。MongoDB 还可以创建代码中引用的数据库和集合(如果它们还不存在的话)。

步骤4:把数据放在一起

下面是从 MySQL 中读取表并将其插入到 MongoDB 中集合的完整脚本。

    importmysql.connectorimportpymongodelete_existing_documents=Truemysql_host="localhost"mysql_database="mydatabase"mysql_schema="myschema"mysql_user="myuser"mysql_password="********"mongodb_host="mongodb://localhost:27017/"mongodb_dbname="mymongodb"mysqldb=mysql.connector.connect(host=mysql_host,database=mysql_database,user=mysql_user,password=mysql_password)mycursor=mysqldb.cursor(dictionary=True)mycursor.execute("SELECT*fromcategories;")myresult=mycursor.fetchall()myclient=pymongo.MongoClient(mongodb_host)mydb=myclient[mongodb_dbname]mycol=mydb["categories"]iflen(myresult)>0:x=mycol.insert_many(myresult)#myresultcomesfrommysqlcursorprint(len(x.inserted_ids))

步骤 5:增强脚本以加载 MySQL 架构中的所有表

该脚本从 MySQL 中读取一个表,并将结果加载到 MongoDB 集合中。然后,下一步是遍历源数据库中所有表的列表,并将结果加载到新的 MySQL 集合中。我们可以通过查询information_schema.tables元数据表来实现这一点,该表提供给定模式中的表列表。然后可以遍历结果并调用上面的脚本来迁移每个表的数据。

    #Iteratethroughthelistoftablesintheschematable_list_cursor=mysqldb.cursor()table_list_cursor.execute("SELECTtable_nameFROMinformation_schema.tablesWHEREtable_schema=%sORDERBYtable_name;",(mysql_schema,))tables=table_list_cursor.fetchall()fortableintables:#Executethemigrationscriptfor'table'

也可以通过将迁移逻辑抽象为一个函数来实现这一点。

    #Functionmigrate_tabledefmigrate_table(db,col_name):mycursor=db.cursor(dictionary=True)mycursor.execute("SELECT*FROM"+col_name+";")myresult=mycursor.fetchall()mycol=mydb[col_name]ifdelete_existing_documents:#deletealldocumentsinthecollectionmycol.delete_many({})#insertthedocumentsiflen(myresult)>0:x=mycol.insert_many(myresult)returnlen(x.inserted_ids)else:return0

步骤6:输出脚本进度并使其可读

脚本的进度是通过使用print 语句来传达的。通过颜色编码使输出易于阅读。例如,以将成功语句打印为绿色,将失败语句打印为红色。

    classbcolors:HEADER='\033[95m'OKBLUE='\033[94m'OKCYAN='\033[96m'OKGREEN='\033[92m'WARNING='\033[93m'FAIL='\033[91m'ENDC='\033[0m'BOLD='\033[1m'UNDERLINE='\033[4m'print(f"{bcolors.HEADER}Thisisaheader{bcolors.ENDC}")print(f"{bcolors.OKBLUE}Thisprintsinblue{bcolors.ENDC}")print(f"{bcolors.OKGREEN}Thismessageisgreen{bcolors.ENDC}")

最终结果

源 MySQL 数据库迁移后的目标 MongoDB 数据库在VSCode 中的 Python 脚本和输出

完整的脚本

    importmysql.connectorimportpymongoimportdatetimeclassbcolors:HEADER='\033[95m'OKBLUE='\033[94m'OKCYAN='\033[96m'OKGREEN='\033[92m'WARNING='\033[93m'FAIL='\033[91m'ENDC='\033[0m'BOLD='\033[1m'UNDERLINE='\033[4m'begin_time=datetime.datetime.now()print(f"{bcolors.HEADER}Scriptstartedat:{begin_time}{bcolors.ENDC}")delete_existing_documents=True;mysql_host="localhost"mysql_database="mydatabase"mysql_schema="myschhema"mysql_user="root"mysql_password=""mongodb_host="mongodb://localhost:27017/"mongodb_dbname="mymongodb"print(f"{bcolors.HEADER}Initializingdatabaseconnections...{bcolors.ENDC}")print("")#MySQLconnectionprint(f"{bcolors.HEADER}ConnectingtoMySQLserver...{bcolors.ENDC}")mysqldb=mysql.connector.connect(host=mysql_host,database=mysql_database,user=mysql_user,password=mysql_password)print(f"{bcolors.HEADER}ConnectiontoMySQLServersucceeded.{bcolors.ENDC}")#MongoDBconnectionprint(f"{bcolors.HEADER}ConnectingtoMongoDBserver...{bcolors.ENDC}")myclient=pymongo.MongoClient(mongodb_host)mydb=myclient[mongodb_dbname]print(f"{bcolors.HEADER}ConnectiontoMongoDBServersucceeded.{bcolors.ENDC}")print(f"{bcolors.HEADER}Databaseconnectionsinitializedsuccessfully.{bcolors.ENDC}")#Startmigrationprint(f"{bcolors.HEADER}Migrationstarted...{bcolors.ENDC}")dblist=myclient.list_database_names()ifmongodb_dbnameindblist:print(f"{bcolors.OKBLUE}Thedatabaseexists.{bcolors.ENDC}")else:print(f"{bcolors.WARNING}Thedatabasedoesnotexist,itisbeingcreated.{bcolors.ENDC}")#Functionmigrate_tabledefmigrate_table(db,col_name):mycursor=db.cursor(dictionary=True)mycursor.execute("SELECT*FROM"+col_name+";")myresult=mycursor.fetchall()mycol=mydb[col_name]ifdelete_existing_documents:#deletealldocumentsinthecollectionmycol.delete_many({})#insertthedocumentsiflen(myresult)>0:x=mycol.insert_many(myresult)returnlen(x.inserted_ids)else:return0#Iteratethroughthelistoftablesintheschematable_list_cursor=mysqldb.cursor()table_list_cursor.execute("SELECTtable_nameFROMinformation_schema.tablesWHEREtable_schema=%sORDERBYtable_nameLIMIT15;",(mysql_schema,))tables=table_list_cursor.fetchall()total_count=len(tables)success_count=0fail_count=0fortableintables:try:print(f"{bcolors.OKCYAN}Processingtable:{table[0]}...{bcolors.ENDC}")inserted_count=migrate_table(mysqldb,table[0])print(f"{bcolors.OKGREEN}Processingtable:{table[0]}completed.{inserted_count}documentsinserted.{bcolors.ENDC}")success_count+=1exceptExceptionase:print(f"{bcolors.FAIL}{e}{bcolors.ENDC}")fail_count+=1print("")print("Migrationcompleted.")print(f"{bcolors.OKGREEN}{success_count}of{total_count}tablesmigratedsuccessfully.{bcolors.ENDC}")iffail_count>0:print(f"{bcolors.FAIL}Migrationof{fail_count}tablesfailed.Seeerrorsabove.{bcolors.ENDC}")end_time=datetime.datetime.now()print(f"{bcolors.HEADER}Scriptcompletedat:{end_time}{bcolors.ENDC}")print(f"{bcolors.HEADER}Totalexecutiontime:{end_time-begin_time}{bcolors.ENDC}")

警告

该脚本适用于中小型 MySQL 数据库,它有几百个表,每个表有几千行。对于具有数百万行的大型数据库,性能可能会受到影响。在开始实际迁移之前,请在表列表查询和实际表select查询上使用 LIMIT 关键字对有限行进行检测。

下载

带点击此处从 GitHub 下载整个脚本: https://github.com/zshameel/MySQL2MongoDB

【51CTO译稿,合作站点转载请注明原文译者和出处为51CTO.com】