python测试工具: 实现数据源自动核对

2023-12-29 22:37:14

测试业务需要:

现有A系统作为下游数据系统,上游系统有A1,A2,A3...

需要将A1,A2,A3...的数据达到某条件后(比如:A1系统销售单提交出库成功)自动触发MQ然后再经过数据清洗落到A系统,并将清洗后数据通过特定规则汇总在A系统报表中

现在需要QA同学验证的功能是:

A系统存储数据清洗后的库表(为切片表)有几十个,且前置系统较多,测试工作量也较多

需要核对清洗后A存库数据是否准确

清洗规则:(1)直接取数 (2)拼接取数 (3)映射取数

直接取数字段在2系统表中字段命名规则一致

so,以下测试工具是针对直接取数规则来开发,以便于测试

代码实现步骤:

(1)将表字段,来源系统表和切片表 数据库链接信息,查询字段 作为变量

将这些信息填入input.xlsx 作为入参

(2)读取表字段,根据来源系统表 数据库链接信息,查询字段

查询来源库表,将查询出字段值存储outfbi.xlsx

? (3)读取表字段,根据切片表 数据库链接信息,查询字段

查询切片库表,将查询出字段值存储outods.xlsx

(4)对比outfbi.xlsx,outods.xlsx的字段值

对比后生成result.xlsx文件,新增列校验结果

核对字段值一致校验结果为Success,否则为Fail

代码如下:

入参文件见附件

DbcheckApi.py
import os
import pymysql
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import PatternFill
import datetime
import ast

"""测试数据路径管理"""
SCRIPTS_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
GENERATECASE_DIR = os.path.join(SCRIPTS_DIR, "dbcheck")
inputDATAS_DIR = os.path.join(GENERATECASE_DIR, "inputdata")
outDATAS_DIR = os.path.join(GENERATECASE_DIR, "outdata")

class DbcheckApi():

 def __init__(self,data):
    self.inputexcel=data
    workbook = load_workbook(filename=self.inputexcel)
    sheet = workbook['数据源']
    # 读取来源表-连接信息
    sourcedb_connection_info = ast.literal_eval(sheet['B3'].value)
    odsdb_connection_info = ast.literal_eval(sheet['B4'].value)
    source_db = sheet['C3'].value.strip()
    ods_db = sheet['C4'].value.strip()
    source_queryby = sheet['D3'].value.strip()
    ods_queryby = sheet['D4'].value.strip()
    print(sourcedb_connection_info)
    print(odsdb_connection_info)
    print(source_db)
    print(ods_db)
    print(source_queryby)
    print(ods_queryby)

    self.sourcedb = sourcedb_connection_info
    self.odsdb = odsdb_connection_info
    self.source_db = source_db
    self.ods_db = ods_db
    self.source_queryby = source_queryby
    self.ods_queryby = ods_queryby

 def source_select_db(self):
    host = self.sourcedb.get('host')
    port = self.sourcedb.get('port')
    user = self.sourcedb.get('user')
    passwd = self.sourcedb.get('passwd')
    db = self.sourcedb.get('db')
    if not host or not port or not user or not passwd or not db:
        error_msg = "连接信息不完整"
        return {"code": -1, "msg": error_msg, "data": ""}

    cnnfbi = pymysql.connect(
        host=host,
        port=port,
        user=user,
        passwd=passwd,
        db=db
    )

    cursor = cnnfbi.cursor()
    try:
        # 读取Excel文件
        df = pd.read_excel(self.inputexcel, sheet_name='Sheet1')
        # 获取第1列,从第2行开始读取的字段名
        fields = df.iloc[1:, 0].tolist()
        print(fields)

        # 构建查询SQL语句
        sql = "SELECT {} FROM {} WHERE {}".format(', '.join(fields), self.source_db, self.source_queryby)
        print(sql)
        # 执行查询语句
        cursor.execute(sql)
    except pymysql.err.OperationalError as e:
        error_msg = str(e)
        if "Unknown column" in error_msg:
            column_name = error_msg.split("'")[1]
            msg={"code": -1, "msg": f"列字段 {column_name} 在 "+self.source_db+" 表结构中不存在,请检查!", "data": ""}
            print(msg)
            return {"code": -1, "msg": f"列字段 {column_name} 在 "+self.source_db+" 表结构中不存在,请检查!", "data": ""}
        else:
            return {"code": -1, "msg": error_msg, "data": ""}
            print(error_msg)

    # 获取查询结果
    result = cursor.fetchall()
    # 关闭游标和连接
    cursor.close()
    cnnfbi.close()
    # 检查查询结果是否为空
    if not result:
        return {"code": -1, "msg": f"查询无数据,请检查sql: {sql}", "data": ""}
    else:
     # 将结果转换为DataFrame对象
     df = pd.DataFrame(result, columns=fields)
     odskey=self.source_db+'表-字段'
     odsvalue=self.source_db+'表-字段值'
     # 创建新的DataFrame对象,将字段和对应值放在两列
     df_new = pd.DataFrame({odskey: fields, odsvalue: df.iloc[0].values})
     outexcel = os.path.join(outDATAS_DIR,  'outputfbi.xlsx')
     # 导出结果到Excel文件
     df_new.to_excel(outexcel, index=False)


 def ods_select_db(self):
    host = self.odsdb.get('host')
    port = self.odsdb.get('port')
    user = self.odsdb.get('user')
    passwd = self.odsdb.get('passwd')
    db = self.odsdb.get('db')

    if not host or not port or not user or not passwd or not db:
        raise ValueError("连接信息不完整")
    cnnfbi = pymysql.connect(
        host=host,
        port=port,
        user=user,
        passwd=passwd,
        db=db
    )
    cursor = cnnfbi.cursor()
    try:
        # 读取Excel文件
        df = pd.read_excel(self.inputexcel, sheet_name='Sheet1')
        # 获取第1列,从第2行开始读取的字段名
        fields = df.iloc[1:, 0].tolist()
        print(fields)

        # 构建查询SQL语句
        sql = "SELECT {} FROM {} WHERE {}".format(', '.join(fields), self.ods_db, self.ods_queryby)
        print(sql)
        # 执行查询语句
        cursor.execute(sql)
    except pymysql.err.OperationalError as e:
        error_msg = str(e)
        if "Unknown column" in error_msg:
            column_name = error_msg.split("'")[1]
            return {"code": -1, "msg": f"列 {column_name} 不存在"+self.ods_db+" 表结构中,请检查!", "data": ""}
        else:
            return {"code": -1, "msg": error_msg, "data": ""}


    # 获取查询结果
    result = cursor.fetchall()
    # 关闭游标和连接
    cursor.close()
    cnnfbi.close()

    # 将结果转换为DataFrame对象
    df = pd.DataFrame(result, columns=fields)
    # 创建新的DataFrame对象,将字段和对应值放在两列
    odskey=self.ods_db+'表-字段'
    odsvalue=self.ods_db+'表-字段值'

    df_new = pd.DataFrame({odskey: fields, odsvalue: df.iloc[0].values})
    # 导出结果到Excel文件
    outexcel = os.path.join(outDATAS_DIR,  'outputfms.xlsx')

    df_new.to_excel(outexcel, index=False)


 def check_order(self):
    self.source_select_db()
    self.ods_select_db()
    outputfbi = os.path.join(outDATAS_DIR,  'outputfbi.xlsx')
    outputfms = os.path.join(outDATAS_DIR,  'outputfms.xlsx')
    df_a = pd.read_excel(outputfbi)
    df_b = pd.read_excel(outputfms)
    # 创建新的DataFrame对象用于存储C表的数据
    df_c = pd.DataFrame()
    # 将A表的列写入C表
    for col in df_a.columns:
        df_c[col] = df_a[col]
    # 将B表的列���入C表
    for col in df_b.columns:
        df_c[col] = df_b[col]

    odsvalue=self.ods_db+'表-字段值'
    fbivalue=self.source_db+'表-字段值'
    # 比对A2和B2列的值,如果不一致,则在第5列写入"校验失败"
    df_c['校验结果'] = ''
    for i in range(len(df_c)):
        if pd.notnull(df_c.at[i, fbivalue]) and pd.notnull(df_c.at[i, odsvalue]):
            fbivalue_rounded = df_c.at[i, fbivalue]
            odsvalue_rounded = df_c.at[i, odsvalue]
            if isinstance(fbivalue_rounded, (int, float)):
                fbivalue_rounded = round(fbivalue_rounded, 3)
            elif isinstance(fbivalue_rounded, datetime.datetime):
                fbivalue_rounded = round(fbivalue_rounded.timestamp(), 3)
            else:
                try:
                    fbivalue_rounded = round(float(fbivalue_rounded), 3)
                except ValueError:
                    pass
            if isinstance(odsvalue_rounded, (int, float)):
                odsvalue_rounded = round(odsvalue_rounded, 3)
            elif isinstance(odsvalue_rounded, datetime.datetime):
                odsvalue_rounded = round(odsvalue_rounded.timestamp(), 3)
            else:
                try:
                    odsvalue_rounded = round(float(odsvalue_rounded), 3)
                except ValueError:
                    pass
            if fbivalue_rounded != odsvalue_rounded:
                df_c.at[i, '校验结果'] = 'Fail'
            else:
                df_c.at[i, '校验结果'] = 'Success'
    # 将结果写入到C.xlsx文件
    df_c.to_excel('checkhead_result.xlsx', index=False)
    # 打开C.xlsx文件并设置背景色
    book = load_workbook('checkhead_result.xlsx')
    writer = pd.ExcelWriter('checkhead_result.xlsx', engine='openpyxl')
    writer.book = book

    # 获取C.xlsx的工作表
    sheet_name = 'Sheet1'
    ws = writer.book[sheet_name]
    # 设置背景色为红色
    red_fill = PatternFill(start_color='FFFF0000', end_color='FFFF0000', fill_type='solid')
    # 遍历校验结果列,将不一致的单元格设置为红色背景
    for row in ws.iter_rows(min_row=2, min_col=len(df_c.columns), max_row=len(df_c), max_col=len(df_c.columns)):
        for cell in row:
            if cell.value == 'Fail':
                cell.fill = red_fill
    # 保存Excel文件
    writer.save()
    writer.close()

if __name__ == '__main__':
    inputexcel = os.path.join(inputDATAS_DIR,  'input.xlsx')
    DbcheckApi(inputexcel).check_order()

文章来源:https://blog.csdn.net/gzl0524/article/details/135293255
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。