a

adsa

V1

2023/02/02阅读:15主题:雁栖湖

spark

Part1SparkGraphX基础

11.图计算的基本概念及SparkGraphx基础

  • 图基本概念以及图计算应用

    • 什么是图:
      • 图实际上是由节点或点,边构成的图
      • 图的三要素:节点、边、边上的关系
    • 图计算应用场景:
      • 社交网络数据
      • 人物关系挖掘
      • 网页链接的关系
      • 金融欺诈业务场景
  • SparkGraphX简介

    • 数据的并行化---hadoop或spark的组件
    • 图的并行化----pregel、graphLab等
    • SparkGraphX整合了data-parallel和graph-parallel实现数据和图计算的并行化处理和分析,只需要一套API就可以完成图计算的操作,GraphX允许将数据当成一个图和一个集合的RDD,而不需要数据的移动或复制,GraphX统一了GraphView和TableVIew,适合做Pipeline流水线工作。
  • SparkGraphX图算法

    • PageRank算法根据搜索引擎计算网页排名的算法
    • 最短路径:根据图算法找出最短路径---六度人脉
    • 社群关系挖掘:根据计算图完成社交网络挖掘--统计三角计数
    • 推荐算法:根据图计算给出推荐结果
    • 更多详细的算法原理在后面展开
  • SparkGraphX底层抽象、

    • SparkCore抽象是RDD

    • SparkSQL底层抽象是Dataframe和Dataset

    • SparkStreaming底层抽象是DStream

    • SparkGraphX抽象是RDPG---弹性分布式属性图

      • 弹性----RDD中弹性

      • 分布式----SPark基于分区的分布式

      • 属性图---对于每一个顶点都是对应一个id(sourceid)和targetid,可以设置任何类型

      • abstract class Graph[VDClassTagEDClassTagprotected (extends Serializable {
          val vertices: VertexRDD[VD]
          val edges: EdgeRDD[ED]
          val triplets: RDD[EdgeTriplet[VDED]]

      • GraphX类默认的apply函数

      • /**
           * Construct a graph from a collection of vertices and
           * edges with attributes.  Duplicate vertices are picked arbitrarily and
           * vertices found in the edge collection but not in the input
           * vertices are assigned the default attribute.
           *
           * @tparam VD the vertex attribute type
           * @tparam ED the edge attribute type
           * @param vertices the "set" of vertices and their attributes
           * @param edges the collection of edges in the graph
           * @param defaultVertexAttr the default vertex attribute to use for vertices that are
           *                          mentioned in edges but not in vertices
           * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
           * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
           */
          def apply[VD: ClassTag, ED: ClassTag](
              vertices: RDD[(VertexId, VD)],
              edges: RDD[Edge[ED]],
              defaultVertexAttr: VD = null.asInstanceOf[VD],
              edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
              vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
            GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
          }
      • SparkGraphX架构层面

        • 底层数据结构RDPG
          • 顶点+边+三元组+图(底层抽象)
          • 接口抽象(图的实现和图的操作--第二层)
          • 算法应用(pagerank算法等的应用--第三层)
      • SparkGraphX的存储方法:边切割+顶点切割

        • 使用是顶点的切割
        • VertexRDD----RoutingTab----EdgeRDD

22.SparkGraphX基础实战

  • HelloWorld---wordcount

  • 第一步:构建顶点的操作

  • 第二部:构建边的操作

  • 第三步:构建图的操作

  • 第四部:完成图的查询操作

  • package sparkgraphx_part1

    import org.apache.spark.graphx.{EdgeGraphVertexId}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConfSparkContext}

    /**
      * 实现目的:构建社交网络关系图用于关系的简单挖掘
      * 步骤:
      * 1-准备环境
      * 2-创建顶点的集合---3 5 2 7
      * 3-创建的边的集合---3  7 ----5  3----2   5 --- 5  7
      * 4-构建图的部分---关系
      * 5-打印基本属性关系
      */

    object SparkGraphX_helloworld {
      def main(args: Array[String]): Unit = {
        //    * 步骤:
        //    * 1-准备环境
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkGraphX_helloworld")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
        //    * 2-创建顶点的集合---3 5 2 7
        val users: RDD[(VertexId, (StringString))] = sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jg""postdoc")), (5L, ("franklin""prof")), (2L, ("isnoic""prof"))))
        users.foreach(println(_))
        //    (3,(rxin,student))
        //    (2,(isnoic,prof))
        //    (7,(jg,postdoc))
        //    (5,(franklin,prof))
        //    * 3-创建的边的集合---3  7 ----5  3----2   5 --- 5  7
        // val srcId : org.apache.spark.graphx.VertexId = { /* compiled code */ },
        // val dstId : org.apache.spark.graphx.VertexId = { /* compiled code */ },
        // val attr : ED = { /* compiled code */
        // RDD[Edge[String]]
        val relationship: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "PI")))
        //    * 4-构建图的部分---关系
        val defaultVertex =("Jack","Missing")
        val graph = Graph(users,relationship,defaultVertex)
        //    * 5-打印基本属性关系
        val VerticalscountVertexId = graph.vertices.filter{case (id,(name,pos))=> pos=="postdoc"}.count()
        println("pos==posdoc:"+Verticalscount)
        //计算满足条件的边的个数,条件是边的soucrid到目的id
        val edgecount: VertexId = graph.edges.filter(e => e.srcId > e.dstId).count()
        println("edgecount:"+edgecount)
        sc.stop()
      }
    }

  • 达到了定义图的基本操作以及查询的操作的目的。

33.图计算基本操作

  • 图的基本数据结构

    • VerticesRDD:在Graphx中顶点的数据的抽象为VertexRDD,它是对RDD(VertexID,VD)的集成和扩展,RDD的类型是Vertexid和VD,其中VD是属性的类型
      • 顶点的id进行了哈希分区,每个分区中的顶点的数据存储在一个数组中,顶点的id存放在哈希列表中
    • EdgeRDD:
      • 源id到目的id的属性
    • Triplets:
      • 边的三元组,使用EdgeTriplet表示Triplet,继承自Edge[ED],加入了原顶点的属性和目的顶点的属性。Triplets对Verticies和edges的链接擦偶作使得Triplets具备原定点和目的顶点以及对应属性的集合。
  • 图的类型和图的存储方式简介

    • 图的类型:有向图和无向图、有环图和无环图、有标签图和无标签图、平行图和边、二分图、RDF和属性图
    • 图的存储方式:顶点的分割和边的分割
    • GraphX的存储方式:
      • 随机切分点
      • 正则随机切分点---在随机顶点的切分的基础上增加了顶点id的排序
      • 1D的分区方法中仅仅使用原顶点id,将所有的源顶点的id相同的放在一个分区中
      • 2D的分区方法中根据二维划分方法将顶点划分到不同的分区中。
  • 图的基本操作

    • 构建图的方法

      • 根据边构建图
        • fromEdges方法构建
      • 根据两个顶点构建图
        • 顶点的创建方法首先根据apply方法,具体就是VertextRDD.apply方法
        • 顶点的构建根据formEdge构建
        • VertexRDD是RDD定义的顶点的改进版本
    • 操作代码:

      • import org.apache.spark.graphx.impl.EdgeRDDImpl
        import org.apache.spark.graphx.{EdgeGraphVertexIdVertexRDD}
        import org.apache.spark.rdd.RDD
        import org.apache.spark.{SparkConfSparkContext}

        /**
          * 目的:使用SparkGraphX创建顶点和边以及图的操作
          * 1-创建顶点
          * 2-边的创建
          * 3-创建图
          */

        object CreateGraph {
          def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("CreateGraph")
            val sc = new SparkContext(conf)
            sc.setLogLevel("WARN")
            //  * 1-1创建顶点
            val users: RDD[(VertexId, (StringString))] = sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jg""postdoc")), (5L, ("franklin""prof")), (2L, ("isnoic""prof"))))
            users.foreach(println(_))
            //  * 1-2创建顶点
            // RDD[(VertexId, (String, String))]-----VertexRDD[VD]是RDD[Vertex[VD]]的扩展版本
            val user1 = VertexRDD[(StringString)](users)
            user1.foreach(println(_))
            //    (5,(franklin,prof))
            //    (2,(isnoic,prof))
            //    (3,(rxin,student))
            //    (7,(jg,postdoc))
            // * 2-1边的创建---使用的是 RDD[Edge[String]]
            val relationship: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "PI")))
            relationship.foreach(println(_))
            // * 2-2边的创建----EdgeRDD[ED]--是RDD[Edge[String]]的优化版本
            import org.apache.spark.graphx.EdgeRDD
            val relationship2: EdgeRDDImpl[StringNothing] = EdgeRDD.fromEdges(relationship)
            relationship2.foreach(println(_))
            //    Edge(5,3,advisor)
            //    Edge(3,7,collab)
            //    Edge(5,7,PI)
            //    Edge(2,5,colleague)
            //* 3-创建图
            //3-1--apply--Graph
            println("Graph Create")
            val defaultVertex =("Jack","Missing")
            val graph = Graph(users,relationship,defaultVertex)
            graph.vertices.foreach(println(_))
            graph.edges.foreach(println(_))
            val graph1: Graph[(StringString), String] = Graph.apply(users,relationship,defaultVertex)
            graph1.vertices.foreach(println(_))
            graph1.edges.foreach(println(_))
            //3-2fromEdgeTuple---rawEdges: RDD[(VertexId, VertexId)]
            val relationship3:RDD[(VertexIdVertexId)] = sc.parallelize(Array((3L, 7L), (5L, 3L), (2L, 5L), (5L, 7L)))
            relationship3.foreach(println(_))
            val graph2: Graph[(StringString), Int] = Graph.fromEdgeTuples(relationship3,defaultVertex)
            graph2.edges.foreach(println(_))
            graph2.vertices.foreach(println(_))
            //3-3formEdges--edges: RDD[Edge[ED]],
            val graph3=Graph.fromEdges(relationship,defaultVertex)// edges: RDD[Edge[ED]],
            graph3.edges.foreach(println(_))
            graph3.vertices.foreach(println(_))
          }
        }

      • 社交网络数据的创建部分

      • package sparkgraphx_part1

        import org.apache.spark.graphx.impl.EdgeRDDImpl
        import org.apache.spark.graphx.{EdgeEdgeRDDGraphVertexRDD}
        import org.apache.spark.rdd.RDD
        import org.apache.spark.{SparkConfSparkContext}

        /** 目的:通过社交网络数据分析,构建社交网络图
          * 1-创建顶点--社交网络数据的顶点
          * 2-创建边的操作--社交网络数据的边
          * 3-创建图的操作
          */

        object SocialNetwordCreateGraph {
          def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SocialNetwordCreateGraph")
            val sc = new SparkContext(conf)
            sc.setLogLevel("ERROR")
            //    * 1-创建顶点--社交网络数据的顶点
            val myVertices: RDD[(LongString)] = sc.makeRDD(Array((1L, "Ann"), (2L, "Bill"), (3L, "Charle"), (4L, "Dianle"), (5L, "Went top Gym this morning")))
            val myVertices1: RDD[(LongString)] = sc.parallelize(Array((1L, "Ann"), (2L, "Bill"), (3L, "Charle"), (4L, "Dianle"), (5L, "Went top Gym this morning")))
            val myVertices2 = VertexRDD(myVertices1) //id+vd
            myVertices2.foreach(println(_))
            //    * 2-创建边的操作--社交网络数据的边
            val myEdge: RDD[Edge[String]] = sc.makeRDD(Array(Edge(1L, 2L, "is_friend"), Edge(2L, 3L, "is_friend"), Edge(3L, 4L, "is_friend"), Edge(4L, 5L, "like-states"), Edge(3L, 5L, "write-states")))
            val myEdge1: RDD[Edge[String]] = sc.parallelize(Array(Edge(1L, 2L, "is_friend"), Edge(2L, 3L, "is_friend"), Edge(3L, 4L, "is_friend"), Edge(4L, 5L, "like-states"), Edge(3L, 5L, "write-states")))
            val edges: EdgeRDDImpl[StringNothing] = EdgeRDD.fromEdges(myEdge1)
            edges.foreach(println(_))
            //    * 3-创建图的操作
            val myGraph = Graph(myVertices, myEdge)
            myGraph.edges.foreach(println(_))
            myGraph.vertices.foreach(println(_))


            val myEdge2: RDD[(LongLong)] = sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 4L), (4L, 5L), (3L, 5L)))
            val graph2: Graph[StringInt] = Graph.fromEdgeTuples(myEdge2, "")
            graph2.edges.foreach(println(_))
            graph2.vertices.foreach(println(_))

            val defaultUser=("Jack","missing")
            val graph3: Graph[(StringString), String] = Graph.fromEdges(edges,defaultUser)
            graph3.edges.foreach(println(_))
            graph3.vertices.foreach(println(_))
          }
        }

  • 图的结构操作

    • 图的基本信息--顶点、边、入度、出度

      • val edgesNum: VertexId = graph.numEdges
         println("edgeNum is:" + edgesNum)
         val verticesNum: VertexId = graph.numVertices
         println("verticesNum is:"+verticesNum)
         val indegrees: VertexRDD[Int] = graph.inDegrees
         println("indegrees is:")
         indegrees.foreach(println(_))
         val outdegree: VertexRDD[Int] = graph.outDegrees
         println("outdegree is:")
         outdegree.foreach(println(_))
         val degrees: VertexRDD[Int] = graph.degrees
         println("degrees:")
         degrees.foreach(println(_))
    • 图的转换操作:mapVertices、mapEdges、mapTriplet

      • mapVertices用于图中每个顶点的数据vd属性进行转换,生成新的顶点,从而生成一张图,新图和原图有相同的结构,不过在转化过程中,没有将顶点的数据传递给领边顶点
      • mapEdges:更新边的属性信息,源码实现中通过f方法生成新的edgerdd,然后在进行初始化操作
      • mapTriplet:用来更新边的属性,其中方法能够进行每条边数据ED及边所连接的两个顶点的数据VD进行计算,生成新的ED2数据。
    • 图的结构操作:reverse、subgraph、mask、groupGraph

      • reverse图的翻转
      • subgraph求解子图
      • mask求解图的交集---顶点和边实现join操作
      • groupEdges-两个顶点之间的多条边的合并
    • 图的关联操作

      • joinVertices
        • 内部的实现也是outerJoinVertices
      • outerJoinVertices
    • 图的聚合操作:mapreduceTriplets、collectNeoghbor

      • aggregateMessages函数底层实现的是mapreduceTriplets函数
        * 1.由边三元组生成消息
        * 2.向边三元组的顶点发消息
        * 3.聚合收到的信息
    • 图的缓存:cache、unpesistVertices

      • persist
      • cache
      • unpesist
      • checpoint
    • 总结

      • 学会查找API进行图的操作

44.SParkGraph最短路径(了解)

  • Dijkstra算法:思想:选择初始值点,根据顶点之和是否最小依次判断所有的点得到最短路径
  • pregelAPI的理解

55.SparkGraphX代码实战

  • Spark社交网络数据的分析和图的操作
    • 完成图的构建
    • 完成图的基本属性分析
    • 完成图的结构操作
    • 完成图的链接操作

分类:

后端

标签:

后端

作者介绍

a
adsa
V1