a
adsa
V1
2023/02/02阅读:15主题:雁栖湖
spark
Part1SparkGraphX基础
11.图计算的基本概念及SparkGraphx基础
-
图基本概念以及图计算应用
-
什么是图: -
图实际上是由节点或点,边构成的图 -
图的三要素:节点、边、边上的关系
-
-
图计算应用场景: -
社交网络数据 -
人物关系挖掘 -
网页链接的关系 -
金融欺诈业务场景
-
-
-
SparkGraphX简介
-
数据的并行化---hadoop或spark的组件 -
图的并行化----pregel、graphLab等 -
SparkGraphX整合了data-parallel和graph-parallel实现数据和图计算的并行化处理和分析,只需要一套API就可以完成图计算的操作,GraphX允许将数据当成一个图和一个集合的RDD,而不需要数据的移动或复制,GraphX统一了GraphView和TableVIew,适合做Pipeline流水线工作。
-
-
SparkGraphX图算法
-
PageRank算法根据搜索引擎计算网页排名的算法 -
最短路径:根据图算法找出最短路径---六度人脉 -
社群关系挖掘:根据计算图完成社交网络挖掘--统计三角计数 -
推荐算法:根据图计算给出推荐结果 -
更多详细的算法原理在后面展开
-
-
SparkGraphX底层抽象、
-
SparkCore抽象是RDD
-
SparkSQL底层抽象是Dataframe和Dataset
-
SparkStreaming底层抽象是DStream
-
SparkGraphX抽象是RDPG---弹性分布式属性图
-
弹性----RDD中弹性
-
分布式----SPark基于分区的分布式
-
属性图---对于每一个顶点都是对应一个id(sourceid)和targetid,可以设置任何类型
-
abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializable {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]] -
GraphX类默认的apply函数
-
/**
* Construct a graph from a collection of vertices and
* edges with attributes. Duplicate vertices are picked arbitrarily and
* vertices found in the edge collection but not in the input
* vertices are assigned the default attribute.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @param vertices the "set" of vertices and their attributes
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for vertices that are
* mentioned in edges but not in vertices
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD],
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
} -
SparkGraphX架构层面
-
底层数据结构RDPG -
顶点+边+三元组+图(底层抽象) -
接口抽象(图的实现和图的操作--第二层) -
算法应用(pagerank算法等的应用--第三层)
-
-
-
SparkGraphX的存储方法:边切割+顶点切割
-
使用是顶点的切割 -
VertexRDD----RoutingTab----EdgeRDD
-
-
-
22.SparkGraphX基础实战
-
HelloWorld---wordcount
-
第一步:构建顶点的操作
-
第二部:构建边的操作
-
第三步:构建图的操作
-
第四部:完成图的查询操作
-
package sparkgraphx_part1
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 实现目的:构建社交网络关系图用于关系的简单挖掘
* 步骤:
* 1-准备环境
* 2-创建顶点的集合---3 5 2 7
* 3-创建的边的集合---3 7 ----5 3----2 5 --- 5 7
* 4-构建图的部分---关系
* 5-打印基本属性关系
*/
object SparkGraphX_helloworld {
def main(args: Array[String]): Unit = {
// * 步骤:
// * 1-准备环境
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkGraphX_helloworld")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// * 2-创建顶点的集合---3 5 2 7
val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jg", "postdoc")), (5L, ("franklin", "prof")), (2L, ("isnoic", "prof"))))
users.foreach(println(_))
// (3,(rxin,student))
// (2,(isnoic,prof))
// (7,(jg,postdoc))
// (5,(franklin,prof))
// * 3-创建的边的集合---3 7 ----5 3----2 5 --- 5 7
// val srcId : org.apache.spark.graphx.VertexId = { /* compiled code */ },
// val dstId : org.apache.spark.graphx.VertexId = { /* compiled code */ },
// val attr : ED = { /* compiled code */
// RDD[Edge[String]]
val relationship: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "PI")))
// * 4-构建图的部分---关系
val defaultVertex =("Jack","Missing")
val graph = Graph(users,relationship,defaultVertex)
// * 5-打印基本属性关系
val Verticalscount: VertexId = graph.vertices.filter{case (id,(name,pos))=> pos=="postdoc"}.count()
println("pos==posdoc:"+Verticalscount)
//计算满足条件的边的个数,条件是边的soucrid到目的id
val edgecount: VertexId = graph.edges.filter(e => e.srcId > e.dstId).count()
println("edgecount:"+edgecount)
sc.stop()
}
} -
达到了定义图的基本操作以及查询的操作的目的。
33.图计算基本操作
-
图的基本数据结构
-
VerticesRDD:在Graphx中顶点的数据的抽象为VertexRDD,它是对RDD(VertexID,VD)的集成和扩展,RDD的类型是Vertexid和VD,其中VD是属性的类型 -
顶点的id进行了哈希分区,每个分区中的顶点的数据存储在一个数组中,顶点的id存放在哈希列表中
-
-
EdgeRDD: -
源id到目的id的属性
-
-
Triplets: -
边的三元组,使用EdgeTriplet表示Triplet,继承自Edge[ED],加入了原顶点的属性和目的顶点的属性。Triplets对Verticies和edges的链接擦偶作使得Triplets具备原定点和目的顶点以及对应属性的集合。
-
-
-
图的类型和图的存储方式简介
-
图的类型:有向图和无向图、有环图和无环图、有标签图和无标签图、平行图和边、二分图、RDF和属性图 -
图的存储方式:顶点的分割和边的分割 -
GraphX的存储方式: -
随机切分点 -
正则随机切分点---在随机顶点的切分的基础上增加了顶点id的排序 -
1D的分区方法中仅仅使用原顶点id,将所有的源顶点的id相同的放在一个分区中 -
2D的分区方法中根据二维划分方法将顶点划分到不同的分区中。
-
-
-
图的基本操作
-
构建图的方法
-
根据边构建图 -
fromEdges方法构建
-
-
根据两个顶点构建图 -
顶点的创建方法首先根据apply方法,具体就是VertextRDD.apply方法 -
顶点的构建根据formEdge构建 -
VertexRDD是RDD定义的顶点的改进版本
-
-
-
操作代码:
-
import org.apache.spark.graphx.impl.EdgeRDDImpl
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 目的:使用SparkGraphX创建顶点和边以及图的操作
* 1-创建顶点
* 2-边的创建
* 3-创建图
*/
object CreateGraph {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("CreateGraph")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// * 1-1创建顶点
val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jg", "postdoc")), (5L, ("franklin", "prof")), (2L, ("isnoic", "prof"))))
users.foreach(println(_))
// * 1-2创建顶点
// RDD[(VertexId, (String, String))]-----VertexRDD[VD]是RDD[Vertex[VD]]的扩展版本
val user1 = VertexRDD[(String, String)](users)
user1.foreach(println(_))
// (5,(franklin,prof))
// (2,(isnoic,prof))
// (3,(rxin,student))
// (7,(jg,postdoc))
// * 2-1边的创建---使用的是 RDD[Edge[String]]
val relationship: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "PI")))
relationship.foreach(println(_))
// * 2-2边的创建----EdgeRDD[ED]--是RDD[Edge[String]]的优化版本
import org.apache.spark.graphx.EdgeRDD
val relationship2: EdgeRDDImpl[String, Nothing] = EdgeRDD.fromEdges(relationship)
relationship2.foreach(println(_))
// Edge(5,3,advisor)
// Edge(3,7,collab)
// Edge(5,7,PI)
// Edge(2,5,colleague)
//* 3-创建图
//3-1--apply--Graph
println("Graph Create")
val defaultVertex =("Jack","Missing")
val graph = Graph(users,relationship,defaultVertex)
graph.vertices.foreach(println(_))
graph.edges.foreach(println(_))
val graph1: Graph[(String, String), String] = Graph.apply(users,relationship,defaultVertex)
graph1.vertices.foreach(println(_))
graph1.edges.foreach(println(_))
//3-2fromEdgeTuple---rawEdges: RDD[(VertexId, VertexId)]
val relationship3:RDD[(VertexId, VertexId)] = sc.parallelize(Array((3L, 7L), (5L, 3L), (2L, 5L), (5L, 7L)))
relationship3.foreach(println(_))
val graph2: Graph[(String, String), Int] = Graph.fromEdgeTuples(relationship3,defaultVertex)
graph2.edges.foreach(println(_))
graph2.vertices.foreach(println(_))
//3-3formEdges--edges: RDD[Edge[ED]],
val graph3=Graph.fromEdges(relationship,defaultVertex)// edges: RDD[Edge[ED]],
graph3.edges.foreach(println(_))
graph3.vertices.foreach(println(_))
}
} -
社交网络数据的创建部分
-
package sparkgraphx_part1
import org.apache.spark.graphx.impl.EdgeRDDImpl
import org.apache.spark.graphx.{Edge, EdgeRDD, Graph, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/** 目的:通过社交网络数据分析,构建社交网络图
* 1-创建顶点--社交网络数据的顶点
* 2-创建边的操作--社交网络数据的边
* 3-创建图的操作
*/
object SocialNetwordCreateGraph {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SocialNetwordCreateGraph")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
// * 1-创建顶点--社交网络数据的顶点
val myVertices: RDD[(Long, String)] = sc.makeRDD(Array((1L, "Ann"), (2L, "Bill"), (3L, "Charle"), (4L, "Dianle"), (5L, "Went top Gym this morning")))
val myVertices1: RDD[(Long, String)] = sc.parallelize(Array((1L, "Ann"), (2L, "Bill"), (3L, "Charle"), (4L, "Dianle"), (5L, "Went top Gym this morning")))
val myVertices2 = VertexRDD(myVertices1) //id+vd
myVertices2.foreach(println(_))
// * 2-创建边的操作--社交网络数据的边
val myEdge: RDD[Edge[String]] = sc.makeRDD(Array(Edge(1L, 2L, "is_friend"), Edge(2L, 3L, "is_friend"), Edge(3L, 4L, "is_friend"), Edge(4L, 5L, "like-states"), Edge(3L, 5L, "write-states")))
val myEdge1: RDD[Edge[String]] = sc.parallelize(Array(Edge(1L, 2L, "is_friend"), Edge(2L, 3L, "is_friend"), Edge(3L, 4L, "is_friend"), Edge(4L, 5L, "like-states"), Edge(3L, 5L, "write-states")))
val edges: EdgeRDDImpl[String, Nothing] = EdgeRDD.fromEdges(myEdge1)
edges.foreach(println(_))
// * 3-创建图的操作
val myGraph = Graph(myVertices, myEdge)
myGraph.edges.foreach(println(_))
myGraph.vertices.foreach(println(_))
val myEdge2: RDD[(Long, Long)] = sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 4L), (4L, 5L), (3L, 5L)))
val graph2: Graph[String, Int] = Graph.fromEdgeTuples(myEdge2, "")
graph2.edges.foreach(println(_))
graph2.vertices.foreach(println(_))
val defaultUser=("Jack","missing")
val graph3: Graph[(String, String), String] = Graph.fromEdges(edges,defaultUser)
graph3.edges.foreach(println(_))
graph3.vertices.foreach(println(_))
}
}
-
-
-
图的结构操作
-
图的基本信息--顶点、边、入度、出度
-
val edgesNum: VertexId = graph.numEdges
println("edgeNum is:" + edgesNum)
val verticesNum: VertexId = graph.numVertices
println("verticesNum is:"+verticesNum)
val indegrees: VertexRDD[Int] = graph.inDegrees
println("indegrees is:")
indegrees.foreach(println(_))
val outdegree: VertexRDD[Int] = graph.outDegrees
println("outdegree is:")
outdegree.foreach(println(_))
val degrees: VertexRDD[Int] = graph.degrees
println("degrees:")
degrees.foreach(println(_))
-
-
图的转换操作:mapVertices、mapEdges、mapTriplet
-
mapVertices用于图中每个顶点的数据vd属性进行转换,生成新的顶点,从而生成一张图,新图和原图有相同的结构,不过在转化过程中,没有将顶点的数据传递给领边顶点 -
mapEdges:更新边的属性信息,源码实现中通过f方法生成新的edgerdd,然后在进行初始化操作 -
mapTriplet:用来更新边的属性,其中方法能够进行每条边数据ED及边所连接的两个顶点的数据VD进行计算,生成新的ED2数据。
-
-
图的结构操作:reverse、subgraph、mask、groupGraph
-
reverse图的翻转 -
subgraph求解子图 -
mask求解图的交集---顶点和边实现join操作 -
groupEdges-两个顶点之间的多条边的合并
-
-
图的关联操作
-
joinVertices -
内部的实现也是outerJoinVertices
-
-
outerJoinVertices
-
-
图的聚合操作:mapreduceTriplets、collectNeoghbor
-
aggregateMessages函数底层实现的是mapreduceTriplets函数
* 1.由边三元组生成消息
* 2.向边三元组的顶点发消息
* 3.聚合收到的信息
-
-
图的缓存:cache、unpesistVertices
-
persist -
cache -
unpesist -
checpoint
-
-
总结
-
学会查找API进行图的操作
-
-
44.SParkGraph最短路径(了解)
-
Dijkstra算法:思想:选择初始值点,根据顶点之和是否最小依次判断所有的点得到最短路径 -
pregelAPI的理解
55.SparkGraphX代码实战
-
Spark社交网络数据的分析和图的操作 -
完成图的构建 -
完成图的基本属性分析 -
完成图的结构操作 -
完成图的链接操作
-
作者介绍
a
adsa
V1