
公众号:uncle39py
V1
2022/09/15阅读:27主题:默认主题
11.爬虫:超级好用scrapy(下)
scrapy第一节
scrapy第二节
十.如何存储数据
一切关于数据的存储,都放在pipeline中处理
1.写到本地文件中
import codecs #处理文件以及编码的库
class JsonWithEncodingPipeline(object):
#自定义json文件的导出
def __init__(self):
self.file = codecs.open('article.json', 'w', encoding="utf-8")
def process_item(self, item, spider): # 方法的名称和参数是固定的
lines = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(lines)
return item
def spider_closed(self, spider): # 方法的名称和参数是固定的
self.file.close()
# 这段代码是一个自己写文件的过程
2.scrapy提供的Exporter方式

以下以JsonItemExporter为例,其他的ItemExporter也是同样的写法
from scrapy.exporters import JsonItemExporter
class JsonExporterPipleline(object):
#调用scrapy提供的json export导出json文件
def __init__(self):
self.file = open('articleexport.json', 'wb') #open
self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False) #导出的形式,可以是xml/csv/json/...
self.exporter.start_exporting() #开始导出
def close_spider(self, spider):
self.exporter.finish_exporting() #结束导出
self.file.close() #close
def process_item(self, item, spider):
self.exporter.export_item(item) #传入需要导出的item
return item
3.导出到数据库

本地使用mysql数据库,可以使用pymysql 或者 mysqlclient 驱动
此项目中使用mysqlclient pip install mysqlclient
import MySQLdb
class MysqlPipeline(object):
#采用同步的机制写入mysql
def __init__(self):
self.conn = MySQLdb.connect('192.168.0.106', 'root', 'root', 'article_spider', charset="utf8", use_unicode=True)
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s
ON DUPLICATE KEY UPDATE fav_nums=VALUES(fav_nums)
"""
params = list()
params.append(item.get("title", ""))
params.append(item.get("url", ""))
params.append(item.get("create_date", ""))
params.append(item.get("fav_nums", ""))
self.cursor.execute(insert_sql, tuple(params))
self.conn.commit()
return item
4.异步方式入库mysql
MySQLdb是同步的库,跟requests一样,同步的库尽量不要用在异步的框架中; 因为CPU解析的速度是很快的,但是如果使用同步的库,就会阻塞住,影响整体效率
import MySQLdb.cursors
from twisted.enterprise import adbapi
class MysqlTwistedPipline(object):
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings[" "],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider):
#使用twisted将mysql插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider) #处理异常
return item
def handle_error(self, failure, item, spider):
#处理异步插入的异常
print (failure)
def do_insert(self, cursor, item):
#执行具体的插入
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE fav_nums=VALUES(fav_nums)
"""
params = list()
params.append(item.get("title", ""))
params.append(item.get("url", ""))
params.append(item.get("create_date", ""))
params.append(item.get("fav_nums", ""))
cursor.execute(insert_sql, tuple(params))
十一 itemloader
itemloader并不是非要学的,但是它可以将解析代码写的更加纯粹,代码分离性好,更容易维护
from scrapy.loader import ItemLoader
def parse_detail(self, response):
match_re = re.match(".*?(\d+)", response.url)
if match_re:
post_id = match_re.group(1)
item_loader = ItemLoader(item=CnblogsArticleItem(), response=response)
item_loader.add_css("title", "#news_title a::text")
item_loader.add_css("create_date", "#news_info .time::text")
item_loader.add_css("content", "#news_content")
item_loader.add_css("tags", ".news_tags a::text")
item_loader.add_value("url", response.url)
if response.meta.get("front_image_url", []):
item_loader.add_value("front_image_url", response.meta.get("front_image_url", []))
#article_item = item_loader.load_item() #可以延迟转换(进入pipeline之前转换)
yield Request(url=parse.urljoin(response.url, "/NewsAjax/GetAjaxNewsInfo?contentId={}".format(post_id)),
meta={"article_item": item_loader, "url": response.url}, callback=self.parse_nums)
通过这种方式传递的item,item里面的值都是list类型,我们需要将值提取出来,因为我们想要的是str
先来看下例子来找灵感:
#items.py 3次演变过程
from scrapy.loader.processors import Compose,TakeFirst,Identity,Join
class CnblogsArticleItem(scrapy.Item):
title = scrapy.Field(
output_processor = TakeFirst()
) # 标题
#这样,title标题就只取list里面的第一个值,
-----------------------------------------------------演变1
#很多字段我们都只需要取第一个字符串,可是就需要每个字段都按上述方法写
#演变:继承ItemLoad类
from scrapy.loader import ItemLoader
class ArticleItemLoader(ItemLoader):
#自定义itemloader
default_output_processor = TakeFirst()
#此处就需要实例化我们自定义的itemloader
item_loader = ArticleItemLoader(item=CnblogsArticleItem(), response=response)
---------------------------------------------------演变2
#针对图片下载,不能使用默认的str,需要单独使用list
#针对create_date,使用itemloader前是需要通过正则匹配的,这个要如何实现呢?
#针对标签,不能只提取第一个字符串,而是需要将列表里面的值拼接起来,如何实现呢
def date_convert(value):
match_re = re.match(".*?(\d+.*)", value)
if match_re:
return match_re.group(1)
else:
return "1970-01-01"
class CnblogsArticleItem(scrapy.Item):
create_date = scrapy.Field(
input_processor = Compose(date_convert)
) # 创建日期
front_image_url = scrapy.Field(
output_processor = Identity()
) # 图片url(scrapy可自动下载)
tags = scrapy.Field(
output_processor = Join(separator=",")
) # 标签
--------------------------------------------------演变3
def parse_nums(self, response):
j_data = json.loads(response.text)
item_loader = response.meta.get("article_item", "")
item_loader.add_value("praise_nums", j_data["DiggCount"])
item_loader.add_value("fav_nums", j_data["TotalView"])
item_loader.add_value("comment_nums", j_data["CommentCount"])
item_loader.add_value("url_object_id", get_md5(response.meta.get("url", "")))
article_item = item_loader.load_item()
yield article_item
作者介绍

公众号:uncle39py
V1