
公众号:uncle39py
2022/09/27阅读:39主题:默认主题
13.爬虫:scrapy爬取知呼所有问答
一.前言:深度优先&广度优先
一个网站的url结构中,会出现各个层级的子域名,各级域名下的网页中会嵌套着首页/详情页等上下各级的url

那么要如何爬取一个网站的所有url呢?
理论上,从首页开始爬取所有的url,然后每一个url打开,爬取这个url底下的所有url,这样不断循环就可以爬到一个网站所有的内容;
这会出现一个问题,如果网站的链接存在环路,那么就会出现死循环,那么如何跳过这样的死循环呢
很简单:去重;
如果出现相同的url,则跳过即可
深度优先,是通过递归的形式来实现的

广度优先,是通过队列的形式来实现的

scrapy的去重策略:url经过md5等方法编码后,url长度压缩到固定长度,然后保存到set中(内存中),当url数据量非常大的时候,我们可以选择使用bloomfilter方法(后期分布式爬虫的时候介绍)
二.爬取B乎网站所有问答
在上一章节中讲解了模拟登陆知乎,此篇在此基础上继续,着重讲解思路.
parse方法从知乎首页提取所有的url,判断如果是question url则yield出去解析,否则继续爬取此url下的所有url,这样不断循环就能爬取到所有的链接
def parse(self, response):
"""
提取出html页面中的所有url 并跟踪这些url进行一步爬取
如果提取的url中格式为 /question/xxx 就下载之后直接进入解析函数
"""
all_urls = response.css("a::attr(href)").extract() #爬取所有的链接
all_urls = [parse.urljoin(response.url, url) for url in all_urls] #拼接成url
all_urls = filter(lambda x:True if x.startswith("https") else False, all_urls) #过滤掉不是https开头的链接
for url in all_urls:
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
if match_obj:
#如果提取到question相关的页面则下载后交由提取函数进行提取
request_url = match_obj.group(1)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
else:
#如果不是question页面则直接进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse)
parse_question方法用于解析每一个question页,在解析完question后,可以加上上面的代码段,如果有其他url则继续爬取此url下的所有url.此处为了逻辑清晰,不加入代码中
解析answer则是根据分析请求得到的
#question的第一页answer的请求url
start_answer_url = "https://www.zhihu.com/api/v4/questions/{0}/answers?sort_by=default&include=data%5B%2A%5D.is_normal%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccollapsed_counts%2Creviewing_comments_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Cmark_infos%2Ccreated_time%2Cupdated_time%2Crelationship.is_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.author.is_blocking%2Cis_blocked%2Cis_followed%2Cvoteup_count%2Cmessage_thread_token%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}"
def parse_question(self, response):
#处理question页面, 从页面中提取出具体的question item
if "QuestionHeader-title" in response.text:
#处理新版本(在新旧版本更替之际会出现版本共存的现象,某些问题以就版本的形式展示)
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css("title", "h1.QuestionHeader-title::text")
item_loader.add_css("content", ".QuestionHeader-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", ".List-headerText span::text")
item_loader.add_css("comments_num", ".QuestionHeader-actions button::text")
item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text")
question_item = item_loader.load_item()
yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer) #动态网站分析技术
yield question_item
解析answer
def parse_answer(self, reponse):
#处理question的answer
ans_json = json.loads(reponse.text)
is_end = ans_json["paging"]["is_end"]
next_url = ans_json["paging"]["next"]
#提取answer的具体字段
for answer in ans_json["data"]:
answer_item = ZhihuAnswerItem()
answer_item["zhihu_id"] = answer["id"]
answer_item["url"] = answer["url"]
answer_item["question_id"] = answer["question"]["id"]
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
answer_item["content"] = answer["content"] if "content" in answer else None
answer_item["parise_num"] = answer["voteup_count"]
answer_item["comments_num"] = answer["comment_count"]
answer_item["create_time"] = answer["created_time"]
answer_item["update_time"] = answer["updated_time"]
answer_item["crawl_time"] = datetime.datetime.now()
yield answer_item
if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
三.将item写入到数据库
我们在此基础上做一些小的改动
class ZhihuQuestionItem(scrapy.Item):
#知乎的问题 item
zhihu_id = scrapy.Field()
topics = scrapy.Field()
url = scrapy.Field()
title = scrapy.Field()
content = scrapy.Field()
answer_num = scrapy.Field()
comments_num = scrapy.Field()
watch_user_num = scrapy.Field()
click_num = scrapy.Field()
crawl_time = scrapy.Field()
####################### 改动1:我们将构建sql的语句放在item里面######################
def get_insert_sql(self):
#插入知乎question表的sql语句
insert_sql = """
insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
"""
#############改动3:################
zhihu_id = self["zhihu_id"][0]
topics = ",".join(self["topics"])
url = self["url"][0]
title = "".join(self["title"])
content = "".join(self["content"])
answer_num = extract_num("".join(self["answer_num"]))# 统一的工具,用正则取字符串中的数字
comments_num = extract_num("".join(self["comments_num"]))
if len(self["watch_user_num"]) == 2:
watch_user_num = int(self["watch_user_num"][0])
click_num = int(self["watch_user_num"][1])
else:
watch_user_num = int(self["watch_user_num"][0])
click_num = 0
crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)
params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time)
return insert_sql, params
class MysqlTwistedPipline(object):
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings[" "],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider):
#使用twisted将mysql插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider) #处理异常
return item
def handle_error(self, failure, item, spider):
#处理异步插入的异常
print (failure)
######################改动2:异步写入mysql,不同item的插入语句就可以统一使用如下方式##################
def do_insert(self, cursor, item):
#执行具体的插入
insert_sql,params = item.get_insert_sql()
cursor.execute(insert_sql, tuple(params))
知乎回答的入库也是一样的逻辑,这里有一个调试的小技巧
scrapy框架yield一个Request就会去下载一个url,对于异步框架的调试,停在断点处后,还会有其他的线程进来执行.
所以通过如下方式来调试,就会只发一个Request

什么?你说看不太懂?
本节相关的视频讲解呈上,拿好不谢
https://m.tb.cn/h.U2T49m2
作者介绍
