从微信审批表单中拿数据写入MySQL,使用dolphinscheduler定时调度
2023-12-29 11:47:56
背景需求
现在需要将微信审批的数据拿出来,并将数据写入MySQL数据中,且需要使用ds定时调度
思路:
(1) 微信审批表单是通过接口去拿的:
微信审批url
https://developer.work.weixin.qq.com/document/path/91982
(2)通过分析,一次最多能拿31天的数据,即2678400s
(3)代码由三个部分组成
user.py 拿微信审批数据
dao.py 将审批数据写入mysql
model.py 创建mysql数据库并建立连接
代码如下
(1) user.py(拿微信审批数据)
import requests
import json
import datetime
import time
import dao
##token
def get_wechat_token(wechat_id,wechat_corpsecret):
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=" + wechat_id + "&corpsecret=" + wechat_corpsecret
response = requests.get(url)
res = eval(response.text)
return res['access_token']
##获取指定审批ID详细信息
def approval_info(id,wechat_token):
url = "https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovaldetail?access_token="+wechat_token
request_param = {
"sp_no" : id
}
response = requests.post(url, data=json.dumps(request_param))
info = eval(response.text)['info']
return response.json()
# return id,info
##获取指定模板所有审批中的审批ID
def get_all_approval(duration,wechat_token):
time_now = (round(time.time()))
start_time = time_now - duration
url = "https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovalinfo?access_token="+wechat_token
request_param = {
"starttime" : str(start_time),
"endtime" : str(time_now),
"cursor" : 0 ,
"size" : 100 ,
"filters" : [
{
"key": "template_id",
"value": "3WLJNwUVZrQxLL8jAdpXdWKubnr2C9HYijsFMRGt"
},
{
"key" : "sp_status",
"value" : "1"
}
]}
response = requests.post(url, data=json.dumps(request_param))
return response.json()
# ['202312210327', '202312220036', '202312220183', '202312220205']
if __name__ == '__main__':
wechat_token = get_wechat_token('xxxx', 'xxxx')
all_approval_ids = get_all_approval(2678400,wechat_token)
# print(all_approval_ids)
# all_approval_ids2 = all_approval_ids['sp_no_list']
for approval_id in all_approval_ids['sp_no_list']:
approval_info_data = approval_info(approval_id,wechat_token)
# print(approval_info_data)
# # 处理审批信息,根据需要执行其他操作
dao.add_user_record(approval_info_data)
最多一次性可以拿31天的数据,2678400是31天
2. dao.py(mysql导入语句)
from sqlalchemy.orm import Session
import model
import datetime
session = model.session
def add_user_record(user):
# primary key -> uuid
sp_no = user['info']['sp_no']
sp_name = user['info']['sp_name'].encode('utf-8')
sp_status = user['info']['sp_status']
apply_time = datetime.datetime.fromtimestamp(user['info']['apply_time'])
# 删除已经存在的记录
session.query(model.User).filter(model.User.sp_no == sp_no).delete()
session.commit()
# 添加记录,为了确保有主键的情况下不冲突,前提是进行了删除了主键的记录
obj = model.User(
sp_no=sp_no,
sp_name=sp_name,
sp_status=sp_status,
apply_time=apply_time
)
session.add(obj)
session.commit()
return obj
3.model.py (mysql建表语句)
from sqlalchemy import Column, Boolean, Integer, String, DateTime, BigInteger
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# import config
Base = declarative_base()
# configs = config.ConfigParser()
# if configs is None:
# print('can not load config file')
# exit(0)
# cn = configs.get(key='conn')
# print('success load config file')
sqlalchemy_database_url = f"mysql+pymysql://{'user_name'}:{'passwd'}@{'url'}:{'3306'}/{'database'}?charset={'utf8'}"
# .format(cn['user'], cn['password'], cn['host'], cn['port'], cn['db'], cn['charset'], )
engine = create_engine(sqlalchemy_database_url)
# 建表語句
class User(Base):
__tablename__ = "users"
sp_no = Column(String(500), index=True, primary_key=True)
sp_name = Column(String(500))
sp_status = Column(Integer)
apply_time = Column(DateTime)
# class Meeting(Base):
# __tablename__ = "meetings"
# subject = Column(String(200))
# meeting_id = Column(String(38), index=True, primary_key=True)
# meeting_code = Column(String(18))
# userid = Column(String(50))
# nick_name = Column(String(50))
# start_time = Column(BigInteger())
# end_time = Column(BigInteger())
# participants_num = Column(Integer())
# online_member_num = Column(Integer())
# meeting_duration = Column(Integer())
# user_meeting_duration = Column(Integer())
# meeting_state = Column(Integer())
# meeting_type = Column(Integer())
#
# meeting_start_time = Column(String(50))
# meeting_end_time = Column(String(50))
#
# # 子会议信息
# sub_meeting_id = Column(String(18))
#
# # 参会人信息
# participant_userid = Column(String(50), index=True, primary_key=True)
# participant_uuid = Column(String(50))
# participant_user_name = Column(String(200))
# participant_join_time = Column(BigInteger(), index=True, primary_key=True)
# participant_left_time = Column(BigInteger())
# participant_instanceid = Column(String(50))
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
#
#
#
# class Participant(BASE):
# __tablename__ = "participants"
#
#
4.for.sh 定时调度
```bash
#!/bin/bash
#/opt/module/miniconda3/bin/python3 ~/bin/for.py
/opt/module/miniconda3/bin/python3 /opt/module/python_shell/user.py
将user.py,dao.py,model.py都存放在/opt/module/python_shell/这个目录下
/opt/module/miniconda3/bin/python3指的是我下载的python3路径
在这里会报一个编码错误
在mysql中执行如下代码,这个是由于写入中文编码不通过导致:
```bash
sqlalchemy.exc.DataError: (pymysql.err.DataError) (1366, "Incorrect string value: '\\xE6\\x97\\xA5\\xE5\\xB8\\xB8...' for column 'sp_name' at row 1")
[SQL: INSERT INTO users (sp_no, sp_name, sp_status, apply_time) VALUES (%(sp_no)s, %(sp_name)s, %(sp_status)s, %(apply_time)s)]
[parameters: {'sp_no': '202312220036', 'sp_name': '日常權限申請', 'sp_status': 1, 'apply_time': 1703203899}]
ALTER TABLE users MODIFY COLUMN sp_name VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
文章来源:https://blog.csdn.net/m0_37759590/article/details/135285322
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!