从微信审批表单中拿数据写入MySQL,使用dolphinscheduler定时调度

2023-12-29 11:47:56

背景需求

现在需要将微信审批的数据拿出来,并将数据写入MySQL数据中,且需要使用ds定时调度
思路:
(1) 微信审批表单是通过接口去拿的:
微信审批url
https://developer.work.weixin.qq.com/document/path/91982
(2)通过分析,一次最多能拿31天的数据,即2678400s
(3)代码由三个部分组成
user.py 拿微信审批数据
dao.py 将审批数据写入mysql
model.py 创建mysql数据库并建立连接
代码如下

(1) user.py(拿微信审批数据)

import requests
import json
import datetime
import time
import dao


##token
def get_wechat_token(wechat_id,wechat_corpsecret):
    url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=" + wechat_id + "&corpsecret=" + wechat_corpsecret
    response = requests.get(url)
    res = eval(response.text)
    return res['access_token']

##获取指定审批ID详细信息
def approval_info(id,wechat_token):
    url = "https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovaldetail?access_token="+wechat_token
    request_param = {
        "sp_no" : id
    }
    response = requests.post(url, data=json.dumps(request_param))
    info = eval(response.text)['info']
    return response.json()
    # return id,info

##获取指定模板所有审批中的审批ID
def get_all_approval(duration,wechat_token):

    time_now = (round(time.time()))
    start_time = time_now - duration
    url = "https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovalinfo?access_token="+wechat_token
    request_param = {
        "starttime" : str(start_time),
        "endtime" : str(time_now),
        "cursor" : 0 ,
        "size" : 100 ,
        "filters" : [
            {
                "key": "template_id",
                "value": "3WLJNwUVZrQxLL8jAdpXdWKubnr2C9HYijsFMRGt"
            },
            {
                "key" : "sp_status",
                "value" : "1"
            }
        ]}
    response = requests.post(url, data=json.dumps(request_param))
    return response.json()


# ['202312210327', '202312220036', '202312220183', '202312220205']

if __name__ == '__main__':
    wechat_token = get_wechat_token('xxxx', 'xxxx')
    all_approval_ids = get_all_approval(2678400,wechat_token)
# print(all_approval_ids)
# all_approval_ids2 = all_approval_ids['sp_no_list']
    for approval_id in all_approval_ids['sp_no_list']:
        approval_info_data = approval_info(approval_id,wechat_token)
       # print(approval_info_data)
# # 处理审批信息,根据需要执行其他操作
        dao.add_user_record(approval_info_data)

最多一次性可以拿31天的数据,2678400是31天

2. dao.py(mysql导入语句)

from sqlalchemy.orm import Session
import model
import datetime
session = model.session


def add_user_record(user):
    # primary key -> uuid
    sp_no = user['info']['sp_no']
    sp_name = user['info']['sp_name'].encode('utf-8')
    sp_status = user['info']['sp_status']
    apply_time = datetime.datetime.fromtimestamp(user['info']['apply_time'])

    # 删除已经存在的记录
    session.query(model.User).filter(model.User.sp_no == sp_no).delete()
    session.commit()

    # 添加记录,为了确保有主键的情况下不冲突,前提是进行了删除了主键的记录
    obj = model.User(
        sp_no=sp_no,
        sp_name=sp_name,
        sp_status=sp_status,
        apply_time=apply_time
    )
    session.add(obj)
    session.commit()

    return obj

3.model.py (mysql建表语句)

from sqlalchemy import Column, Boolean, Integer, String, DateTime, BigInteger
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# import config

Base = declarative_base()


# configs = config.ConfigParser()
# if configs is None:
#     print('can not load config file')
#     exit(0)
# cn = configs.get(key='conn')
# print('success load config file')

sqlalchemy_database_url = f"mysql+pymysql://{'user_name'}:{'passwd'}@{'url'}:{'3306'}/{'database'}?charset={'utf8'}"

# .format(cn['user'], cn['password'], cn['host'], cn['port'], cn['db'], cn['charset'], )
engine = create_engine(sqlalchemy_database_url)

# 建表語句
class User(Base):
    __tablename__ = "users"
    sp_no = Column(String(500), index=True, primary_key=True)
    sp_name = Column(String(500))
    sp_status = Column(Integer)
    apply_time = Column(DateTime)


# class Meeting(Base):
#     __tablename__ = "meetings"
#     subject = Column(String(200))
#     meeting_id = Column(String(38), index=True, primary_key=True)
#     meeting_code = Column(String(18))
#     userid = Column(String(50))
#     nick_name = Column(String(50))
#     start_time = Column(BigInteger())
#     end_time = Column(BigInteger())
#     participants_num = Column(Integer())
#     online_member_num = Column(Integer())
#     meeting_duration = Column(Integer())
#     user_meeting_duration = Column(Integer())
#     meeting_state = Column(Integer())
#     meeting_type = Column(Integer())
#
#     meeting_start_time = Column(String(50))
#     meeting_end_time = Column(String(50))
#
#     # 子会议信息
#     sub_meeting_id = Column(String(18))
#
#     # 参会人信息
#     participant_userid = Column(String(50), index=True, primary_key=True)
#     participant_uuid = Column(String(50))
#     participant_user_name = Column(String(200))
#     participant_join_time = Column(BigInteger(), index=True, primary_key=True)
#     participant_left_time = Column(BigInteger())
#     participant_instanceid = Column(String(50))


Base.metadata.create_all(engine)


Session = sessionmaker(bind=engine)
session = Session()



#
#
#
# class Participant(BASE):
#     __tablename__ = "participants"
#
#

4.for.sh 定时调度

```bash
#!/bin/bash
#/opt/module/miniconda3/bin/python3 ~/bin/for.py
/opt/module/miniconda3/bin/python3 /opt/module/python_shell/user.py

将user.py,dao.py,model.py都存放在/opt/module/python_shell/这个目录下
/opt/module/miniconda3/bin/python3指的是我下载的python3路径

在这里会报一个编码错误

在mysql中执行如下代码,这个是由于写入中文编码不通过导致:

```bash
sqlalchemy.exc.DataError: (pymysql.err.DataError) (1366, "Incorrect string value: '\\xE6\\x97\\xA5\\xE5\\xB8\\xB8...' for column 'sp_name' at row 1")
[SQL: INSERT INTO users (sp_no, sp_name, sp_status, apply_time) VALUES (%(sp_no)s, %(sp_name)s, %(sp_status)s, %(apply_time)s)]
[parameters: {'sp_no': '202312220036', 'sp_name': '日常權限申請', 'sp_status': 1, 'apply_time': 1703203899}]
ALTER TABLE users MODIFY COLUMN sp_name VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

文章来源:https://blog.csdn.net/m0_37759590/article/details/135285322
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。