公众号:uncle39py

V1

2022/09/27阅读:39主题:默认主题

13.爬虫:scrapy爬取知呼所有问答

一.前言:深度优先&广度优先

一个网站的url结构中,会出现各个层级的子域名,各级域名下的网页中会嵌套着首页/详情页等上下各级的url

那么要如何爬取一个网站的所有url呢?

理论上,从首页开始爬取所有的url,然后每一个url打开,爬取这个url底下的所有url,这样不断循环就可以爬到一个网站所有的内容;

这会出现一个问题,如果网站的链接存在环路,那么就会出现死循环,那么如何跳过这样的死循环呢

很简单:去重;

如果出现相同的url,则跳过即可

深度优先,是通过递归的形式来实现的

广度优先,是通过队列的形式来实现的

scrapy的去重策略:url经过md5等方法编码后,url长度压缩到固定长度,然后保存到set中(内存中),当url数据量非常大的时候,我们可以选择使用bloomfilter方法(后期分布式爬虫的时候介绍)

二.爬取B乎网站所有问答

在上一章节中讲解了模拟登陆知乎,此篇在此基础上继续,着重讲解思路.

parse方法从知乎首页提取所有的url,判断如果是question url则yield出去解析,否则继续爬取此url下的所有url,这样不断循环就能爬取到所有的链接

def parse(self, response):
    """
    提取出html页面中的所有url 并跟踪这些url进行一步爬取
    如果提取的url中格式为 /question/xxx 就下载之后直接进入解析函数
    """

    all_urls = response.css("a::attr(href)").extract()  #爬取所有的链接
    all_urls = [parse.urljoin(response.url, url) for url in all_urls]  #拼接成url
    all_urls = filter(lambda x:True if x.startswith("https"else False, all_urls)  #过滤掉不是https开头的链接
    for url in all_urls:
        match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
        if match_obj:
            #如果提取到question相关的页面则下载后交由提取函数进行提取
            request_url = match_obj.group(1)
            yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
        else:
            #如果不是question页面则直接进一步跟踪
            yield scrapy.Request(url, headers=self.headers, callback=self.parse)

parse_question方法用于解析每一个question页,在解析完question后,可以加上上面的代码段,如果有其他url则继续爬取此url下的所有url.此处为了逻辑清晰,不加入代码中

解析answer则是根据分析请求得到的

 #question的第一页answer的请求url
start_answer_url = "https://www.zhihu.com/api/v4/questions/{0}/answers?sort_by=default&include=data%5B%2A%5D.is_normal%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccollapsed_counts%2Creviewing_comments_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Cmark_infos%2Ccreated_time%2Cupdated_time%2Crelationship.is_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.author.is_blocking%2Cis_blocked%2Cis_followed%2Cvoteup_count%2Cmessage_thread_token%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}"

def parse_question(self, response):
    #处理question页面, 从页面中提取出具体的question item
    if "QuestionHeader-title" in response.text:
        #处理新版本(在新旧版本更替之际会出现版本共存的现象,某些问题以就版本的形式展示)
        match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
        if match_obj:
            question_id = int(match_obj.group(2))

        item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
        item_loader.add_css("title""h1.QuestionHeader-title::text")
        item_loader.add_css("content"".QuestionHeader-detail")
        item_loader.add_value("url", response.url)
        item_loader.add_value("zhihu_id", question_id)
        item_loader.add_css("answer_num"".List-headerText span::text")
        item_loader.add_css("comments_num"".QuestionHeader-actions button::text")
        item_loader.add_css("watch_user_num"".NumberBoard-value::text")
        item_loader.add_css("topics"".QuestionHeader-topics .Popover div::text")

        question_item = item_loader.load_item()
    yield scrapy.Request(self.start_answer_url.format(question_id, 200), headers=self.headers, callback=self.parse_answer)  #动态网站分析技术
    yield question_item

解析answer

def parse_answer(self, reponse):
    #处理question的answer
    ans_json = json.loads(reponse.text)
    is_end = ans_json["paging"]["is_end"]
    next_url = ans_json["paging"]["next"]

    #提取answer的具体字段
    for answer in ans_json["data"]:
        answer_item = ZhihuAnswerItem()
        answer_item["zhihu_id"] = answer["id"]
        answer_item["url"] = answer["url"]
        answer_item["question_id"] = answer["question"]["id"]
        answer_item["author_id"] = answer["author"]["id"if "id" in answer["author"else None
        answer_item["content"] = answer["content"if "content" in answer else None
        answer_item["parise_num"] = answer["voteup_count"]
        answer_item["comments_num"] = answer["comment_count"]
        answer_item["create_time"] = answer["created_time"]
        answer_item["update_time"] = answer["updated_time"]
        answer_item["crawl_time"] = datetime.datetime.now()

        yield answer_item

    if not is_end:
        yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)

三.将item写入到数据库

此处先回顾一下异步方式入库mysql

我们在此基础上做一些小的改动

class ZhihuQuestionItem(scrapy.Item):
    #知乎的问题 item
    zhihu_id = scrapy.Field()
    topics = scrapy.Field()
    url = scrapy.Field()
    title = scrapy.Field()
    content = scrapy.Field()
    answer_num = scrapy.Field()
    comments_num = scrapy.Field()
    watch_user_num = scrapy.Field()
    click_num = scrapy.Field()
    crawl_time = scrapy.Field()
    ####################### 改动1:我们将构建sql的语句放在item里面######################
    def get_insert_sql(self):
        #插入知乎question表的sql语句
        insert_sql = """
            insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
              watch_user_num, click_num, crawl_time
              )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
              watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
        """

      #############改动3:################
        zhihu_id = self["zhihu_id"][0]
        topics = ",".join(self["topics"])
        url = self["url"][0]
        title = "".join(self["title"])
        content = "".join(self["content"])
        answer_num = extract_num("".join(self["answer_num"]))# 统一的工具,用正则取字符串中的数字
        comments_num = extract_num("".join(self["comments_num"]))

        if len(self["watch_user_num"]) == 2:
            watch_user_num = int(self["watch_user_num"][0])
            click_num = int(self["watch_user_num"][1])
        else:
            watch_user_num = int(self["watch_user_num"][0])
            click_num = 0

        crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)

        params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
                  watch_user_num, click_num, crawl_time)

        return insert_sql, params
class MysqlTwistedPipline(object):
    def __init__(self, dbpool):
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls, settings):
        dbparms = dict(
            host = settings["MYSQL_HOST"],
            db = settings["MYSQL_DBNAME"],
            user = settings[" "],
            passwd = settings["MYSQL_PASSWORD"],
            charset='utf8',
            cursorclass=MySQLdb.cursors.DictCursor,
            use_unicode=True,
        )
        dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)

        return cls(dbpool)

    def process_item(self, item, spider):
        #使用twisted将mysql插入变成异步执行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error, item, spider) #处理异常
        return item

    def handle_error(self, failure, item, spider):
        #处理异步插入的异常
        print (failure)
######################改动2:异步写入mysql,不同item的插入语句就可以统一使用如下方式##################
    def do_insert(self, cursor, item):
        #执行具体的插入
        insert_sql,params = item.get_insert_sql()
        cursor.execute(insert_sql, tuple(params))

知乎回答的入库也是一样的逻辑,这里有一个调试的小技巧

scrapy框架yield一个Request就会去下载一个url,对于异步框架的调试,停在断点处后,还会有其他的线程进来执行.

所以通过如下方式来调试,就会只发一个Request

什么?你说看不太懂?

本节相关的视频讲解呈上,拿好不谢

https://m.tb.cn/h.U2T49m2

分类:

后端

标签:

后端

作者介绍

公众号:uncle39py
V1