Friend Follower Analysis using Apache Spark GraphX’s PageRank algorithm
GraphX is Apache Spark’s API for graphs and graph-parallel computation. This includes transformation, exploration, and graph computation. Data can be viewed both as graph & collections. This use case discusses friend follower analysis using Apache Spark GraphX’s PageRank operator.
PageRank measures the importance of each vertex in a graph, by determining which vertexes have the most edges with other vertexes.
At a high-level, Apache Spark GraphX extends the Spark RDD abstraction by introducing the Resilient Distributed Property Graph: a directed multigraph with properties attached to each vertex and edge. To support graph computation, Apache Spark GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and mapReduceTriplets) as well as an optimized variant of the Pregel API. In addition, Apache Spark GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.
One of the Apache Spark GraphX operator is PageRank, which is based on the Google PageRank algorithm.
In GraphX, there are two implementations of the PageRank algorithm. The first implementation uses the Pregel interface and runs PageRank for a fixed number of iterations numIter. The second one uses the standalone Graph interface and runs PageRank until the change in PageRank score is smaller than a specific error tolerance tol.
We have users from 1 to n and every user follows another user. Below are steps to implement PageRank.
1. Defining the configuration of Spark Context –
val conf: SparkConf = new SparkConf().setAppName(“Graph X Second”).setMaster(sparkMasterUrl).setSparkHome(sparkHome).setJars(jars);
2. Creating Spark Context variable –
val sc = new SparkContext(conf)
3. Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile(“/usr/spark_pack/spark_input/users.txt”).map(line => line.split(“,”)).map(parts => (parts.head.toLong, parts.tail)))
4. Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, “/usr/spark_pack/spark_input/followers.txt”)
5. Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) { case (uid, deg, Some(attrList)) => attrList
6. Some users may not have attributes so we set them as empty
case (uid, deg, None) => Array.empty[String] }
7. Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
8. Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.0001)
9. Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) { case (uid, attrList, Some(pr)) => (pr, attrList.toList) case (uid, attrList, None) => (0.0, attrList.toList) }
10. Getting top 5 info println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString(“\n”))
Output
(1,(1.4588814096664682,List (Robert, Chef)))
(2,(1.390049198216498,List (Sam, Wrestler)))
(7,(1.2973176314422592,List (Paresh, Actor)))
(3,(0.9993442038507723,List (Ralph, Author)))
(6,(0.7013599933629602,List (Vinay, Doctor)))
Take Away
PageRank algorithm to rank the node importance in networks. These PageRank algorithms exploit data-parallelization over vertices. In particular, the Pregel implementation relies on local message passing for updating the PageRank scores. Another point to note is that the PageRank scores that are returned are not normalized. Thus, they do not represent a probability distribution. Moreover, pages that have no incoming links will have a PageRank score of alpha. Nonetheless, the top pages can be still be found by sorting the vertices of the returned PageRank graph by their score attribute.