Python多进程爬虫实战:高效抓取网易云课堂课程数据并存储到MySQL
在当今大数据时代,网络爬虫已成为获取互联网信息的重要手段。今天,我将分享一个实用的Python多进程爬虫项目,它能高效地从网易云课堂抓取Python相关课程数据,并使用连接池技术将数据存储到MySQL数据库中。
项目概述
这个爬虫项目主要实现以下功能:
多进程并发抓取,大幅提升爬取效率自动翻页获取全部课程数据数据去重,避免重复存储结构化存储到MySQL数据库
核心技术栈
import requests
import time
import pymysql
from multiprocessing import Pool
我们使用了以下关键库:
:发送HTTP请求
requests:连接MySQL数据库
pymysql:实现多进程并发
multiprocessing.Pool
代码详解
1. 数据库连接配置
conn = pymysql.connect(host='localhost',
port=3306,
user='root',
passwd='root',
db='spider',
charset='utf8')
cur = conn.cursor()
扩展建议:在实际项目中,建议将数据库配置提取到外部配置文件或环境变量中,提高安全性和可维护性。
2. 核心爬取函数
def get_json(index):
url = "https://study.163.com/p/search/studycourse.json"
payload = {
"activityId": 0,
"keyword": "python",
"orderType": 5,
"pageIndex": index,
"pageSize": 50,
"priceType": -1,
"qualityType": 0,
"relativeOffset": 0,
"searchTimeType": -1,
}
headers = {
"accept": "application/json",
"host": "study.163.com",
"content-type": "application/json",
"origin": "https://study.163.com",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"
}
try:
response = requests.post(url,json=payload,headers=headers)
content_json = response.json()
if content_json and content_json["code"] == 0:
return content_json
return None
except Exception as e:
print('出错了')
print(e)
return None
这个函数负责向网易云课堂API发送POST请求,获取课程数据的JSON格式响应。
扩展功能:可以添加重试机制,当请求失败时自动重试:
def get_json_with_retry(index, retry_count=3):
for i in range(retry_count):
result = get_json(index)
if result is not None:
return result
print(f"第{i+1}次重试...")
time.sleep(2) # 等待2秒后重试
return None
3. 数据解析与处理
def get_content(content_json):
if "result" in content_json:
return content_json["result"]["list"]
def check_course_exit(course_id):
sql = f'select course_id from course where course_id = {course_id}'
cur.execute(sql)
course = cur.fetchone()
if course:
return True
else:
return False
函数用于检查课程是否已存在数据库中,避免数据重复。
check_course_exit
4. 数据存储功能
def save_to_course(course_data):
sql_course = """insert into course
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
cur.executemany(sql_course, course_data)
def save_mysql(content):
course_data = []
for item in content:
if not check_course_exit(item['courseId']):
course_value = (item['courseId'],item['productName'],item['provider'],item['score'],item['learnerCount'],
item['lessonCount'],item['lectorName'],item['originalPrice'], item['discountPrice'],
item['discountRate'],item['imgUrl'], item['bigImgUrl'],
item['description'],)
course_data.append(course_value)
if course_data: # 只有当有数据时才执行插入
save_to_course(course_data)
扩展功能:添加数据验证,确保插入数据的完整性:
def validate_course_data(course_data):
"""验证课程数据是否完整"""
required_fields = ['courseId', 'productName']
for item in course_data:
for field in required_fields:
if field not in item or not item[field]:
return False
return True
5. 多进程主控逻辑
def main(index):
content_json = get_json(index)
if content_json: # 添加空值检查
content = get_content(content_json)
if content: # 添加空值检查
save_mysql(content)
if __name__ == '__main__':
print('开始执行')
start = time.time()
# 获取总页数
first_page_data = get_json(1)
if first_page_data and 'result' in first_page_data:
totlePageCount = first_page_data['result']["query"]["totlePageCount"]
else:
print("无法获取总页数,程序退出")
exit(1)
# 使用多进程
pool = Pool()
index = ([x for x in range(1,totlePageCount+1)])
pool.map(main, index)
pool.close()
pool.join()
# 数据库操作
cur.close()
conn.commit()
conn.close()
end = time.time()
print(f'程序执行时间是{end-start}秒。')
性能优化与扩展
1. 数据库连接池
在多进程环境下,每个进程应该有自己的数据库连接,而不是共享全局连接:
def get_db_connection():
return pymysql.connect(host='localhost',
port=3306,
user='root',
passwd='root',
db='spider',
charset='utf8')
def main(index):
# 每个进程创建自己的数据库连接
conn = get_db_connection()
cur = conn.cursor()
# ... 其余代码 ...
# 提交并关闭连接
conn.commit()
cur.close()
conn.close()
2. 添加日志记录
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('spider.log'),
logging.StreamHandler()
]
)
def main(index):
logging.info(f"开始处理第{index}页")
# ... 其余代码 ...
logging.info(f"第{index}页处理完成")
3. 错误处理与监控
def main(index):
try:
content_json = get_json(index)
if content_json:
content = get_content(content_json)
if content:
save_mysql(content)
except Exception as e:
logging.error(f"处理第{index}页时发生错误: {str(e)}")
数据库表结构
为确保代码正常运行,需要提前创建相应的数据库表:
CREATE TABLE course (
course_id BIGINT PRIMARY KEY,
product_name VARCHAR(255),
provider VARCHAR(100),
score DECIMAL(3,2),
learner_count INT,
lesson_count INT,
lector_name VARCHAR(100),
original_price DECIMAL(10,2),
discount_price DECIMAL(10,2),
discount_rate DECIMAL(5,2),
img_url VARCHAR(500),
big_img_url VARCHAR(500),
description TEXT
);
总结
通过这个项目,我们实现了一个高效的多进程网络爬虫,具有以下特点:
高效并发:利用多进程技术大幅提升数据抓取速度数据完整:通过数据验证和去重机制保证数据质量健壮性:添加了异常处理和重试机制可扩展:代码结构清晰,易于添加新功能
这个爬虫框架不仅适用于网易云课堂,经过适当修改也可以应用于其他类似的数据抓取场景。希望这个项目能为你的爬虫开发提供有价值的参考!
注意:在实际使用爬虫时,请遵守网站的robots.txt协议,合理控制请求频率,避免对目标网站造成过大压力。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...
