ChatGLM2-130b 生成 pymysql 动态CURD

2023-12-13 18:48:44
  • ?config.ini
[database]
user = root
password = 1qaz@WSX
host = 192.168.10.212
port = 3306
db = dataset
charset=utf8
  • Database.py
import string

import pymysql
import configparser
import logging

# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')



class Database:
    # 数据库操作类,提供连接数据库、更新、查询、删除、截断、解释查询、创建和删除表等功能。
    def __init__(self):
        # 读取配置文件
        config = configparser.ConfigParser()
        config.read('config.ini')

        # 获取数据库连接参数
        self.host = config.get('database', 'host')
        self.user = config.get('database', 'user')
        self.password = config.get('database', 'password')
        self.db = config.get('database', 'db')

        # 初始化数据库连接
        self.conn = self.get_conn()

    # 获取数据库连接
    def get_conn(self):
        # 异常捕捉:如果获取连接失败,捕获异常并打印错误信息
        try:
            return pymysql.connect(**{
                'host': self.host,
                'user': self.user,
                'password': self.password,
                'db': self.db,
                'charset': 'utf8mb4'
            })
        except pymysql.MySQLError as e:
            logging.debug(f"获取数据库连接失败:{e}")
            return None

    # 更新动态表
    def update_dynamic_table(self, table_name, columns, condition):
        # 执行SQL更新语句
        sql = f"UPDATE %s SET %s WHERE %s" % (
            table_name, ', '.join(['%s=\'%s\'' % (c, str(columns[c])) for c in columns.keys()]), condition)
        # 在执行之前打印 SQL 语句
        logging.debug(sql)
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql)
                self.conn.commit()
                logging.debug(f"更新动态表'{table_name}'成功。")
        except pymysql.MySQLError as e:
            logging.debug(f"更新动态表失败:{e}")
        except Exception as e:
            logging.debug(f"更新动态表时发生错误:{e}")

    def query_dynamic_table_page(self, table_name, columns, condition, page, page_size):
        # 计算偏移量
        offset = (page - 1) * page_size

        # 构造 SQL 查询语句,包括分页参数
        sql = f"SELECT {','.join(columns)} FROM {table_name} WHERE {condition} LIMIT {page_size} OFFSET {offset}"

        # 打印 SQL 语句
        logging.debug(sql)

        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql)
                result = cursor.fetchall()
                logging.debug(f"查询动态表{table_name}成功")

                # 计算总记录数以确定总页数
                total_rows = cursor.rowcount
                total_pages = (total_rows + page_size - 1) // page_size

                # 使用字典推导式合并查询结果
                if result:
                    dict = {columns[i - 1]: row[i - 1] for i in range(len(columns)) for row in result}
                else:
                    dict = {}

                logging.debug(dict)
        except pymysql.MySQLError as e:
            logging.debug(f"查询动态表失败:{e}")
            result = None
        except Exception as e:
            logging.debug(f"查询动态表时发生错误:{e}")

        return result, total_pages

        # 查询动态表

    def query_dynamic_list(self, table_name, columns, condition):
        # 执行SQL查询语句
        sql = "SELECT %s FROM %s WHERE %s limit 10000" % (",".join(columns), table_name, condition)
        # 在执行之前打印 SQL 语句
        logging.debug(sql)
        result_list = []
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql)
                result = cursor.fetchall()
                logging.debug(result)
                logging.debug(f"查询动态表{table_name}成功")
                # 使用字典推导式合并
                if len(result) > 0:
                    for i in range(len(result)):
                        result_list.append(result[i][0])
                logging.debug(result_list)
                return
        except pymysql.MySQLError as e:
            logging.debug(f"查询动态表失败:{e}")
            result = None
        except Exception as e:
            logging.debug(f"查询动态表时发生错误:{e}")
        return result_list

    # 查询动态表
    def query_dynamic_table(self, table_name, columns, condition):
        # 执行SQL查询语句
        sql = "SELECT %s FROM %s WHERE %s limit 10000" % (",".join(columns), table_name, condition)
        # 在执行之前打印 SQL 语句
        logging.debug(sql)
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql)
                result = cursor.fetchall()
                logging.debug(f"查询动态表{table_name}成功")
                # 使用字典推导式合并
                if len(result) > 0:
                    list = []
                    for row in range(len(result)):
                        dict = {columns[i - 1]: result[row][i - 1] for i in range(len(columns))}
                        list.append(dict)
                else:
                    list = []
                logging.debug(list)
                return list
        except pymysql.MySQLError as e:
            logging.debug(f"查询动态表失败:{e}")
            result = None
        except Exception as e:
            logging.debug(f"查询动态表时发生错误:{e}")
        return list

    # 根据条件删除动态表中的数据
    def delete_dynamic_table_condition(self, table_name, condition):
        # 执行SQL删除语句
        sql = "DELETE FROM %s WHERE %s"
        # 在执行之前打印 SQL 语句
        logging.debug(sql % (table_name, condition))
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql % (table_name, condition))
                self.conn.commit()
                logging.debug(f"删除动态表{table_name}中的数据成功")
        except pymysql.MySQLError as e:
            logging.debug(f"删除动态表中的数据失败:{e}")
        except Exception as e:
            logging.debug(f"删除动态表中的数据时发生错误:{e}")

    # 截断动态表
    def truncate_dynamic_table(self, table_name):
        # 执行SQL截断语句
        sql = "TRUNCATE TABLE %s"
        # 在执行之前打印 SQL 语句
        logging.debug(sql % (
            table_name))
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql % (table_name))
                self.conn.commit()
                logging.debug(f"截断动态表{table_name}")
        except pymysql.MySQLError as e:
            logging.debug(f"截断动态表失败:{e}")
        except Exception as e:
            logging.debug(f"截断动态表时发生错误:{e}")

    def explain_dynamic_query(self, table_name, columns, condition):
        # 构建 EXPLAIN 查询语句
        sql = "EXPLAIN SELECT %s FROM %s WHERE %s"
        # 在执行之前打印 SQL 语句
        logging.debug(sql % (', '.join(columns), table_name, condition))
        # 执行 EXPLAIN 查询
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql % (', '.join(columns), table_name, condition))
                result = cursor.fetchall()
                # 打印查询计划
                for row in result:
                    logging.debug(row)
        except pymysql.MySQLError as e:
            logging.debug(f"分析查询计划失败:{e}")
        except Exception as e:
            logging.debug(f"分析查询计划时发生错误:{e}")

    # 动态创建表
    def create_dynamic_table(self, table_name, columns):
        # 构造列定义的字符串
        column_definitions = ', '.join([f"{column} {columns[column]}" for column in columns.keys()])

        # 构造完整的CREATE TABLE语句,包括AUTO_INCREMENT
        sql = f"CREATE TABLE IF NOT EXISTS {table_name} (id BIGINT AUTO_INCREMENT PRIMARY KEY, {column_definitions})"
        # 在执行之前打印 SQL 语句
        logging.debug(sql)
        # 执行SQL语句
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql)
                self.conn.commit()
                logging.debug(f"表'{table_name}'创建成功。")
        except pymysql.MySQLError as e:
            logging.debug(f"创建表失败:{e}")
        except Exception as e:
            logging.debug(f"创建表时发生错误:{e}")

    # 动态删除表
    def drop_table(self, table_name):
        # 构造DROP TABLE语句
        sql = f"DROP TABLE IF EXISTS {table_name}"
        # 在执行之前打印 SQL 语句
        logging.debug(sql)
        # 执行SQL语句
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql)
                self.conn.commit()
                logging.debug(f"表'{table_name}'删除成功。")
        except pymysql.MySQLError as e:
            logging.debug(f"删除表失败:{e}")
        except Exception as e:
            logging.debug(f"删除表时发生错误:{e}")

        # 动态插入数据到表

    def upsert_dynamic_table(self, table_name, data, pk='id'):
        pk_val = data[pk]
        if db.query_dynamic_table(table_name, [pk], "%s='%s'" % (pk, pk_val)).__len__() > 0:
            del data[pk]
            db.update_dynamic_table(table_name, data, "%s='%s'" % (pk, pk_val))
        else:
            db.insert_dynamic_table(table_name, data)

    def insert_dynamic_table(self, table_name, data):
        # 构造列和值的列表
        columns = ', '.join(data.keys())
        values = ', '.join([f"'{value}'" for value in data.values()])

        # 构造完整的INSERT INTO语句
        sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values})"
        # 在执行之前打印 SQL 语句
        logging.debug(sql)
        # 执行SQL语句
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql)
                self.conn.commit()
                logging.debug(f"数据插入到表'{table_name}'成功。")
                return cursor.lastrowid
        except pymysql.MySQLError as e:
            logging.debug(f"插入数据失败:{e}")
        except Exception as e:
            logging.debug(f"插入数据时发生错误:{e}")


# 使用示例
if __name__ == "__main__":
    db = Database()
    db.create_dynamic_table('my_table', {'column1': 'INT', 'column2': 'VARCHAR(255)'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})
    db.insert_dynamic_table('my_table', {'column1': 1, 'column2': 'value2'})

    db.insert_dynamic_table('my_table', {'id': 10, 'column1': 1, 'column2': 'value2'})
    db.update_dynamic_table('my_table', {'column1': 2}, 'id=1')
    db.query_dynamic_table('my_table', ['id', 'column1', 'column2'], 'id<=100')
    page = 1
    db.query_dynamic_table_page('my_table', ['column1', 'column2'], 'id=1', page, 50)
    db.delete_dynamic_table_condition('my_table', 'id=1')
    db.truncate_dynamic_table('my_table')
    db.explain_dynamic_query('my_table', ['column1', 'column2'], 'id=1')
    db.drop_table('my_table')
  • ?输出

文章来源:https://blog.csdn.net/wangzhpwang/article/details/134836908
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。