ChatGLM2-130b 生成 pymysql 动态CURD
2023-12-13 18:48:44
- ?config.ini
[database]
user = root
password = 1qaz@WSX
host = 192.168.10.212
port = 3306
db = dataset
charset=utf8
- Database.py
import string
import pymysql
import configparser
import logging
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
class Database:
# 数据库操作类,提供连接数据库、更新、查询、删除、截断、解释查询、创建和删除表等功能。
def __init__(self):
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini')
# 获取数据库连接参数
self.host = config.get('database', 'host')
self.user = config.get('database', 'user')
self.password = config.get('database', 'password')
self.db = config.get('database', 'db')
# 初始化数据库连接
self.conn = self.get_conn()
# 获取数据库连接
def get_conn(self):
# 异常捕捉:如果获取连接失败,捕获异常并打印错误信息
try:
return pymysql.connect(**{
'host': self.host,
'user': self.user,
'password': self.password,
'db': self.db,
'charset': 'utf8mb4'
})
except pymysql.MySQLError as e:
logging.debug(f"获取数据库连接失败:{e}")
return None
# 更新动态表
def update_dynamic_table(self, table_name, columns, condition):
# 执行SQL更新语句
sql = f"UPDATE %s SET %s WHERE %s" % (
table_name, ', '.join(['%s=\'%s\'' % (c, str(columns[c])) for c in columns.keys()]), condition)
# 在执行之前打印 SQL 语句
logging.debug(sql)
try:
with self.conn.cursor() as cursor:
cursor.execute(sql)
self.conn.commit()
logging.debug(f"更新动态表'{table_name}'成功。")
except pymysql.MySQLError as e:
logging.debug(f"更新动态表失败:{e}")
except Exception as e:
logging.debug(f"更新动态表时发生错误:{e}")
def query_dynamic_table_page(self, table_name, columns, condition, page, page_size):
# 计算偏移量
offset = (page - 1) * page_size
# 构造 SQL 查询语句,包括分页参数
sql = f"SELECT {','.join(columns)} FROM {table_name} WHERE {condition} LIMIT {page_size} OFFSET {offset}"
# 打印 SQL 语句
logging.debug(sql)
try:
with self.conn.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
logging.debug(f"查询动态表{table_name}成功")
# 计算总记录数以确定总页数
total_rows = cursor.rowcount
total_pages = (total_rows + page_size - 1) // page_size
# 使用字典推导式合并查询结果
if result:
dict = {columns[i - 1]: row[i - 1] for i in range(len(columns)) for row in result}
else:
dict = {}
logging.debug(dict)
except pymysql.MySQLError as e:
logging.debug(f"查询动态表失败:{e}")
result = None
except Exception as e:
logging.debug(f"查询动态表时发生错误:{e}")
return result, total_pages
# 查询动态表
def query_dynamic_list(self, table_name, columns, condition):
# 执行SQL查询语句
sql = "SELECT %s FROM %s WHERE %s limit 10000" % (",".join(columns), table_name, condition)
# 在执行之前打印 SQL 语句
logging.debug(sql)
result_list = []
try:
with self.conn.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
logging.debug(result)
logging.debug(f"查询动态表{table_name}成功")
# 使用字典推导式合并
if len(result) > 0:
for i in range(len(result)):
result_list.append(result[i][0])
logging.debug(result_list)
return
except pymysql.MySQLError as e:
logging.debug(f"查询动态表失败:{e}")
result = None
except Exception as e:
logging.debug(f"查询动态表时发生错误:{e}")
return result_list
# 查询动态表
def query_dynamic_table(self, table_name, columns, condition):
# 执行SQL查询语句
sql = "SELECT %s FROM %s WHERE %s limit 10000" % (",".join(columns), table_name, condition)
# 在执行之前打印 SQL 语句
logging.debug(sql)
try:
with self.conn.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
logging.debug(f"查询动态表{table_name}成功")
# 使用字典推导式合并
if len(result) > 0:
list = []
for row in range(len(result)):
dict = {columns[i - 1]: result[row][i - 1] for i in range(len(columns))}
list.append(dict)
else:
list = []
logging.debug(list)
return list
except pymysql.MySQLError as e:
logging.debug(f"查询动态表失败:{e}")
result = None
except Exception as e:
logging.debug(f"查询动态表时发生错误:{e}")
return list
# 根据条件删除动态表中的数据
def delete_dynamic_table_condition(self, table_name, condition):
# 执行SQL删除语句
sql = "DELETE FROM %s WHERE %s"
# 在执行之前打印 SQL 语句
logging.debug(sql % (table_name, condition))
try:
with self.conn.cursor() as cursor:
cursor.execute(sql % (table_name, condition))
self.conn.commit()
logging.debug(f"删除动态表{table_name}中的数据成功")
except pymysql.MySQLError as e:
logging.debug(f"删除动态表中的数据失败:{e}")
except Exception as e:
logging.debug(f"删除动态表中的数据时发生错误:{e}")
# 截断动态表
def truncate_dynamic_table(self, table_name):
# 执行SQL截断语句
sql = "TRUNCATE TABLE %s"
# 在执行之前打印 SQL 语句
logging.debug(sql % (
table_name))
try:
with self.conn.cursor() as cursor:
cursor.execute(sql % (table_name))
self.conn.commit()
logging.debug(f"截断动态表{table_name}")
except pymysql.MySQLError as e:
logging.debug(f"截断动态表失败:{e}")
except Exception as e:
logging.debug(f"截断动态表时发生错误:{e}")
def explain_dynamic_query(self, table_name, columns, condition):
# 构建 EXPLAIN 查询语句
sql = "EXPLAIN SELECT %s FROM %s WHERE %s"
# 在执行之前打印 SQL 语句
logging.debug(sql % (', '.join(columns), table_name, condition))
# 执行 EXPLAIN 查询
try:
with self.conn.cursor() as cursor:
cursor.execute(sql % (', '.join(columns), table_name, condition))
result = cursor.fetchall()
# 打印查询计划
for row in result:
logging.debug(row)
except pymysql.MySQLError as e:
logging.debug(f"分析查询计划失败:{e}")
except Exception as e:
logging.debug(f"分析查询计划时发生错误:{e}")
# 动态创建表
def create_dynamic_table(self, table_name, columns):
# 构造列定义的字符串
column_definitions = ', '.join([f"{column} {columns[column]}" for column in columns.keys()])
# 构造完整的CREATE TABLE语句,包括AUTO_INCREMENT
sql = f"CREATE TABLE IF NOT EXISTS {table_name} (id BIGINT AUTO_INCREMENT PRIMARY KEY, {column_definitions})"
# 在执行之前打印 SQL 语句
logging.debug(sql)
# 执行SQL语句
try:
with self.conn.cursor() as cursor:
cursor.execute(sql)
self.conn.commit()
logging.debug(f"表'{table_name}'创建成功。")
except pymysql.MySQLError as e:
logging.debug(f"创建表失败:{e}")
except Exception as e:
logging.debug(f"创建表时发生错误:{e}")
# 动态删除表
def drop_table(self, table_name):
# 构造DROP TABLE语句
sql = f"DROP TABLE IF EXISTS {table_name}"
# 在执行之前打印 SQL 语句
logging.debug(sql)
# 执行SQL语句
try:
with self.conn.cursor() as cursor:
cursor.execute(sql)
self.conn.commit()
logging.debug(f"表'{table_name}'删除成功。")
except pymysql.MySQLError as e:
logging.debug(f"删除表失败:{e}")
except Exception as e:
logging.debug(f"删除表时发生错误:{e}")
# 动态插入数据到表
def upsert_dynamic_table(self, table_name, data, pk='id'):
pk_val = data[pk]
if db.query_dynamic_table(table_name, [pk], "%s='%s'" % (pk, pk_val)).__len__() > 0:
del data[pk]
db.update_dynamic_table(table_name, data, "%s='%s'" % (pk, pk_val))
else:
db.insert_dynamic_table(table_name, data)
def insert_dynamic_table(self, table_name, data):
# 构造列和值的列表
columns = ', '.join(data.keys())
values = ', '.join([f"'{value}'" for value in data.values()])
# 构造完整的INSERT INTO语句
sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values})"
# 在执行之前打印 SQL 语句
logging.debug(sql)
# 执行SQL语句
try:
with self.conn.cursor() as cursor:
cursor.execute(sql)
self.conn.commit()
logging.debug(f"数据插入到表'{table_name}'成功。")
return cursor.lastrowid
except pymysql.MySQLError as e:
logging.debug(f"插入数据失败:{e}")
except Exception as e:
logging.debug(f"插入数据时发生错误:{e}")
# 使用示例
if __name__ == "__main__":
db = Database()
db.create_dynamic_table('my_table', {'column1': 'INT', 'column2': 'VARCHAR(255)'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
db.insert_dynamic_table('my_table', {'id': 10, 'column1': 1, 'column2': 'value2'})
db.update_dynamic_table('my_table', {'column1': 2}, 'id=1')
db.query_dynamic_table('my_table', ['id', 'column1', 'column2'], 'id<=100')
page = 1
db.query_dynamic_table_page('my_table', ['column1', 'column2'], 'id=1', page, 50)
db.delete_dynamic_table_condition('my_table', 'id=1')
db.truncate_dynamic_table('my_table')
db.explain_dynamic_query('my_table', ['column1', 'column2'], 'id=1')
db.drop_table('my_table')
- ?输出
文章来源:https://blog.csdn.net/wangzhpwang/article/details/134836908
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!