Python异步编程入门:从回调地狱到async/await
关于异步编程
最近在重构一个爬虫项目时,发现同步请求的效率实在太低了。每次发送请求都需要等待响应,大量的时间都浪费在I/O等待上。于是决定深入学习Python的异步编程,从最初的回调地狱到后来的async/await语法,这个过程让我对异步编程有了全新的认识。
一开始接触异步编程时,我被各种概念搞得晕头转向,什么事件循环、协程、Future,感觉像是在学习一门全新的语言。不过通过不断地实践和踩坑,终于摸清了一些门道。
一、异步编程基础概念
1.1 什么是异步编程
异步编程是一种编程范式,允许程序在等待某些操作(如网络请求、文件读写)完成时,可以继续执行其他任务。这与传统的同步编程不同,同步编程中程序会阻塞直到当前操作完成。
在Python中,异步编程主要通过async和await关键字实现,底层依赖于事件循环来调度协程。
1.2 协程与事件循环
协程(Coroutine)是异步编程的核心概念。它是一种特殊的函数,可以在执行过程中暂停并在稍后恢复。在Python中,通过async def定义协程函数。
async def my_coroutine():
print("协程开始执行")
await asyncio.sleep(1) # 模拟异步操作
print("协程执行完成")
return "完成"
事件循环(Event Loop)是异步编程的运行时,它负责调度和执行协程。Python的asyncio模块提供了事件循环的实现。
二、async/await语法详解
2.1 async关键字
async关键字用于定义协程函数。协程函数与普通函数的区别在于:
- 调用协程函数不会立即执行,而是返回一个协程对象
- 协程函数内部可以使用
await关键字来等待其他协程或异步操作
import asyncio
async def fetch_data(url):
"""模拟异步获取数据"""
print(f"开始获取 {url}")
await asyncio.sleep(2) # 模拟网络请求
print(f"完成获取 {url}")
return f"数据来自 {url}"
# 调用协程函数,返回协程对象
coroutine = fetch_data("https://example.com")
print(type(coroutine)) # <class 'coroutine'>
2.2 await关键字
await关键字用于等待协程的完成。它只能在async函数内部使用,表示暂停当前协程的执行,直到被等待的协程完成。
async def main():
result = await fetch_data("https://example.com")
print(result)
# 运行异步主函数
asyncio.run(main())
三、实用异步编程技巧
3.1 并发执行多个任务
在实际应用中,经常需要并发执行多个异步任务。asyncio提供了多种方式来处理并发:
import asyncio
import time
async def fetch_url(url, delay):
print(f"开始获取 {url}")
await asyncio.sleep(delay) # 模拟不同的请求时间
print(f"完成获取 {url}")
return f"结果: {url}"
async def main():
start_time = time.time()
# 使用 asyncio.gather 并发执行多个任务
tasks = [
fetch_url("https://url1.com", 2),
fetch_url("https://url2.com", 1),
fetch_url("https://url3.com", 3)
]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
print(f"结果: {results}")
asyncio.run(main())
3.2 处理异步任务的超时
在实际应用中,需要为异步操作设置超时,避免任务长时间挂起:
async def fetch_with_timeout(url, timeout=5):
try:
# 使用 asyncio.wait_for 设置超时
result = await asyncio.wait_for(
fetch_data(url),
timeout=timeout
)
return result
except asyncio.TimeoutError:
print(f"获取 {url} 超时")
return None
async def main():
result = await fetch_with_timeout("https://slow-website.com", 2)
if result:
print("获取成功")
else:
print("获取失败或超时")
asyncio.run(main())
3.3 异步上下文管理器
对于需要在进入和退出时执行特定操作的场景,可以使用异步上下文管理器:
class AsyncDatabaseConnection:
def __init__(self, db_url):
self.db_url = db_url
self.connection = None
async def __aenter__(self):
print(f"连接到数据库 {self.db_url}")
# 模拟异步连接
await asyncio.sleep(0.5)
self.connection = f"连接对象: {self.db_url}"
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"关闭数据库连接 {self.db_url}")
# 模拟异步关闭
await asyncio.sleep(0.2)
self.connection = None
async def main():
async with AsyncDatabaseConnection("postgresql://localhost:5432/mydb") as conn:
print(f"使用连接: {conn}")
await asyncio.sleep(1) # 模拟数据库操作
print("退出上下文")
asyncio.run(main())
四、实战案例:异步爬虫
学了这么多理论,不如来个实战案例。下面是一个简单的异步爬虫,用于并发获取多个网页:
import asyncio
import aiohttp
import time
async def fetch_page(session, url):
"""异步获取网页内容"""
try:
async with session.get(url) as response:
content = await response.text()
print(f"获取到 {url},长度: {len(content)} 字符")
return content
except Exception as e:
print(f"获取 {url} 失败: {e}")
return None
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/3"
]
start_time = time.time()
# 创建会话
async with aiohttp.ClientSession() as session:
# 并发获取所有页面
tasks = [fetch_page(session, url) for url in urls]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"所有页面获取完成,总耗时: {end_time - start_time:.2f}秒")
print(f"成功获取页面数: {sum(1 for r in results if r is not None)}")
# 运行主函数
if __name__ == "__main__":
asyncio.run(main())
五、常见陷阱与最佳实践
5.1 阻塞操作
在异步代码中,任何阻塞操作都会阻塞整个事件循环。例如,使用time.sleep()而不是asyncio.sleep():
# 错误的做法 - 会阻塞事件循环
async def bad_example():
print("开始")
time.sleep(2) # 阻塞操作!
print("结束")
# 正确的做法
async def good_example():
print("开始")
await asyncio.sleep(2) # 异步等待
print("结束")
5.2 异常处理
异步代码中的异常处理需要特别注意,特别是当处理多个并发任务时:
async def risky_task(name, should_fail=False):
await asyncio.sleep(1)
if should_fail:
raise ValueError(f"任务 {name} 失败了")
return f"任务 {name} 成功"
async def main():
tasks = [
risky_task("A", False),
risky_task("B", True), # 这个会失败
risky_task("C", False)
]
# 使用 asyncio.gather 并设置 return_exceptions=True
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 出现异常: {result}")
else:
print(f"任务 {i} 结果: {result}")
asyncio.run(main())
六、总结
这次异步编程的学习让我深刻体会到Python在异步处理方面的强大能力。从最开始的困惑到后来的熟练运用,这个过程虽然充满挑战,但收获颇丰。
异步编程特别适合I/O密集型任务,如网络请求、文件操作等场景。在这些场景下,异步编程可以显著提高程序的性能和响应能力。
不过,异步编程也有其复杂性,需要特别注意异常处理、资源管理等问题。在实际项目中,要根据具体场景选择是否使用异步编程,避免过度设计。
以上是个人学习Python异步编程的心得总结,如有不对请见谅。
默认评论
Halo系统提供的评论