Python3,压箱底的代码片段,提升工作效率稳稳的。

2024-01-02 11:04:26

1、引言

小屌丝:鱼哥,这年底了,得不得分享一点压箱底的东西啊
小鱼:… 压箱底的东西,还能分享出来?
小屌丝:唉,为了我能学习更多的知识, 你可以分享分享。
小鱼:确定要压箱底的?
小屌丝:必须的。
小鱼:好吧,既然你这么想要,那就给你。
在这里插入图片描述

小屌丝:… 你这…
小鱼:这就是我压箱底的知识啊。
小屌丝:没行到,鱼哥你…
在这里插入图片描述

小鱼:算了算了, 我还是跟你分享一点点点点的压箱底的代码吧
小屌丝:这还差不多。

2、代码实例

2.1 操作存储服务

2.1.1 Redis操作

# -*- coding:utf-8 -*-
# @Time   : 2023-12-19
# @Author : Carl_DJ

'''
实现功能:
	对redis链接,写入数据等操作
'''

import reids
import requests

#链接redis
def redis_con():
    pool = reids.ConnectPool(
        host = 'host',
        port = 8809,
        decode_responses = True
    )
    rd = redis.Redis(connnect_pool = pool)
    return rd

if __name__ == '__main__':
    #写入redis操作
    rd = redis_con()
    rd.set('redis_testDemo1','redis_testDemo2')

2.1.2 MongoDB操作

# -*- coding:utf-8 -*-
# @Time   : 2023-12-19
# @Author : Carl_DJ

'''
实现功能:
	对MongoDB 链接,写入数据等操作
'''

from pymongo import MongoClient

#链接MongoDB
conn = MongoClient("mongodb://%s:%s@ipaddress:14339/mydb" % ('username', 'password'))
db = conn.mydb
mongo_collection = db.mydata

# 批量写入MongoDB操作
res = requests.get(url=url,params=query).json()
commentList = res['data']['commentList']
mongo_collection.insert_many(commentList)

2.1.3 MySQL操作

# -*- coding:utf-8 -*-
# @Time   : 2023-12-19
# @Author : Carl_DJ

'''
实现功能:
	对MySQL链接,查询等等操作
'''


import MySQLdb

#链接MySQL数据库
db = MySQLdb.connect("localhost","username","password","DBname",charset = 'utf8')

# 创建游标
cursor = db.cursor()

#执行sql语句
cursor.execute("SELCT * FROM DBname ")

# 获取一条数据
data_one = cursor.fetchone()

# 获取多条数据
data_all = cursor.fetchall()

#关闭数据库链接
db.close()


2.2 异步操作

# -*- coding:utf-8 -*-
# @Time   : 2023-12-19
# @Author : Carl_DJ

'''
实现功能:
	异步编程
'''

import aiofiles

async def get_html(session,url):
    try:
        async with session.get(url,timeout = 10) as res:
            if not res.status // 100 == 2:
                print(f'状态码为:{res.status}')
                print(f"无法爬取{url},报错了 ")
            else:
                res.encoding  = 'utf-8'
                text = await  res.text()
                return  text
    except Exception as e:
        print(f'错误信息:{e}')
        await  get_html(session,url)

async  def download(title_list,content_list):
    async  with aiofiles.open('{}.txt'.format(title_list[0]),'a',encoding='utf-8') as f:
        await  f.write('{}'.format(str(content_list)))

2.3 多线程

# -*- coding:utf-8 -*-
# @Time   : 2023-12-19
# @Author : Carl_DJ

'''
实现功能:
	多线程
'''

import threading
from datetime import *

#自定义全局变量需要的线程数,20
thread_num = 20
#自定义全局变量每个线程需要循环的数量,10
one_work_num = 10
#自定义函数test()
def test():
    #编写测试代码
    now = datetime.now()
    print("打印时间戳:",now)
    #设置死循环
    #x =0
    #while (x ==1):
    #    print("打印时间戳:",now)
def working():
    # 引用全局变量
    global one_work_num
    #嵌套执行指定循环次数的 test()函数
    for i in range(0,one_work_num):
        test()
#自定义thr()函数,来执行多线程
def thr():
    #自定义一个空的数组,用来存放线程组
    threads = []
    #设置循环次数为thread_num
    for i in  range(thread_num):
        #将working()函数放入到线程中
        t =threading.Thread(target=working,name="T" + str(i))
        #设置守护线程
        t.setDaemon(True)
        threads.append(t)
    #启动线程并执行
    for t in threads:
        t.start()
    #设置阻塞线程
    for t in  threads:
        t.join(2)
if __name__ == "__main__":
    thr()

3、总结

看到这里,今天的分享差不多就结束了。
其实,在平时工作中,只要善于总结,总会有更高效的代码片段的收集。

关于自动化脚本,可以参照小鱼的这些博文:

也可以关注小鱼的Python专栏:

我是小鱼

  • CSDN 博客专家
  • 阿里云 专家博主
  • 51CTO博客专家
  • 51认证讲师等
  • 认证金牌面试官
  • 职场面试培训、职业规划师
  • 多个国内主流技术社区的认证专家博主
  • 多款主流产品(阿里云等)测评一、二等奖获得者

关注小鱼,带你学习更多更专业更前言的Python相关技术。

文章来源:https://blog.csdn.net/wuyoudeyuer/article/details/135333994
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。