-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraphx_commands.txt
216 lines (140 loc) · 7.1 KB
/
graphx_commands.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#Scala
import org.apache.spark.graphx._
case class User(name: String, age: Int)
val users = List((1L, User("Alex", 26)), (2L, User("Bill", 42)), (3L, User("Carol", 18)),(4L, User("Dave", 16)), (5L, User("Eve", 45)), (6L, User("Farell", 30)),(7L, User("Garry", 32)), (8L, User("Harry", 36)), (9L, User("Ivan", 28)),(10L, User("Jill", 48)))
val usersRDD = sc.parallelize(users)
val follows = List(Edge(1L, 2L, 1), Edge(2L, 3L, 1), Edge(3L, 1L, 1), Edge(3L, 4L, 1),Edge(3L, 5L, 1), Edge(4L, 5L, 1), Edge(6L, 5L, 1), Edge(7L, 6L, 1), Edge(6L, 8L, 1), Edge(7L, 8L, 1), Edge(7L, 9L, 1), Edge(9L, 8L, 1), Edge(8L, 10L, 1), Edge(10L, 9L, 1), Edge(1L, 11L, 1))
val followsRDD = sc.parallelize(follows)
val defaultUser = User("NA", 0)
val socialGraph = Graph(usersRDD, followsRDD, defaultUser)
val numEdges = socialGraph.numEdges
val numVertices = socialGraph.numVertices
val inDegrees = socialGraph.inDegrees
inDegrees.collect
val outDegrees = socialGraph.outDegrees
outDegrees.collect
val degrees = socialGraph.degrees
degrees.collect
val vertices = socialGraph.vertices
val edges = socialGraph.edges
val triplets = socialGraph.triplets
triplets.take(3)
val follows = triplets.map{ t => t.srcAttr.name + " follows " + t.dstAttr.name}
follows.take(5)
val updatedAges = socialGraph.mapVertices( (vertexId, user) =>User(user.name, user.age + 1 ))
socialGraph.vertices.take(5)
updatedAges.vertices.take(5)
val weightedGraph = socialGraph.mapTriplets{ t =>if (t.srcAttr.age >= 30) 2 else 1}
socialGraph.edges.take(10)
weightedGraph.edges.take(10)
val reverseGraph = socialGraph.reverse
reverseGraph.triplets.map{ t => t.srcAttr.name + " follows " + t.dstAttr.name}.take(10)
val subgraph = weightedGraph.subgraph( edgeTriplet => edgeTriplet.attr > 1,(vertexId, vertexProperty) => true)
subgraph.edges.take(10)
val correctAges = sc.parallelize(List((3L, 28), (4L, 26)))
val correctedGraph = socialGraph.joinVertices(correctAges)((id, user, correctAge) =>User(user.name, correctAge))
correctedGraph.vertices.collect
case class UserWithCity(name: String, age: Int, city: String)
val userCities = sc.parallelize(List((1L, "Boston"), (3L, "New York"), (5L, "London"),(7L, "Bombay"), (9L, "Tokyo"), (10L, "Palo Alto")))
val socialGraphWithCity = socialGraph.outerJoinVertices(userCities)((id, user, cityOpt) =>cityOpt match {case Some(city) => UserWithCity(user.name, user.age, city)case None => UserWithCity(user.name, user.age, "NA")})
socialGraphWithCity.vertices.take(5)
//Analyzing real data
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.IntParam
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
// define the Flight Schema
case class Flight(dofM:String, dofW:String, carrier:String, tailnum:String, flnum:Int, org_id:Long, origin:String, dest_id:Long, dest:String, crsdeptime:Double, deptime:Double, depdelaymins:Double, crsarrtime:Double, arrtime:Double, arrdelay:Double,crselapsedtime:Double,dist:Int)
// function to parse input into Flight class
def parseFlight(str: String): Flight = {
val line = str.split(",")
Flight(line(0), line(1), line(2), line(3), line(4).toInt, line(5).toLong, line(6), line(7).toLong, line(8), line(9).toDouble, line(10).toDouble, line(11).toDouble, line(12).toDouble, line(13).toDouble, line(14).toDouble, line(15).toDouble, line(16).toInt)
}
hdfs dfs -put flightdata.csv /user
// load the data into a RDD
val textRDD = sc.textFile("/user/flightdata.csv")
// parse the RDD of csv lines into an RDD of flight classes
val flightsRDD = textRDD.map(parseFlight).cache()
// create airports RDD with ID and Name
val airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct
airports.take(1)
// Defining a default vertex called nowhere
val nowhere = "nowhere"
// Map airport ID to the 3-letter code to use for printlns
val airportMap = airports.map { case ((org_id), name) => (org_id -> name) }.collect.toList.toMap
// create routes RDD with srcid, destid, distance
val routes = flightsRDD.map(flight => ((flight.org_id, flight.dest_id), flight.dist)).distinct
routes.take(2)
// create edges RDD with srcid, destid , distance
val edges = routes.map {
case ((org_id, dest_id), distance) =>Edge(org_id.toLong, dest_id.toLong, distance) }
edges.take(1)
// define the graph
val graph = Graph(airports, edges, nowhere)
// graph vertices
graph.vertices.take(2)
// graph edges
graph.edges.take(2)
// How many airports?
val numairports = graph.numVertices
// How many airports?
val numroutes = graph.numEdges
// routes > 1000 miles distance?
graph.edges.filter { case ( Edge(org_id, dest_id,distance))=> distance > 1000}.take(3)
// triplets
graph.triplets.take(3).foreach(println)
// print out longest routes
graph.triplets.sortBy(_.attr, ascending=false).map(triplet =>
"Distance " + triplet.attr.toString + " from " + triplet.srcAttr + " to " + triplet.dstAttr + ".").take(10).foreach(println)
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
// Get the name for the airport with id 10397
airportMap(10397)
// get top 3
val maxIncoming = graph.inDegrees.collect.sortWith(_._2 > _._2).map(x => (airportMap(x._1), x._2)).take(3)
maxIncoming.foreach(println)
// which airport has the most outgoing flights?
val maxout= graph.outDegrees.join(airports).sortBy(_._2._1, ascending=false).take(3)
maxout.foreach(println)
// use pageRank
val ranks = graph.pageRank(0.1).vertices
// join the ranks with the map of airport id to name
val temp= ranks.join(airports)
temp.take(1)
// sort by ranking
val temp2 = temp.sortBy(_._2._1, false)
temp2.take(2)
// get just the airport names
val impAirports =temp2.map(_._2._2)
impAirports.take(4)
// starting vertex
val sourceId: VertexId = 13024
// a graph with edges containing airfare cost calculation
val gg = graph.mapEdges(e => 50.toDouble + e.attr.toDouble/20 )
// initialize graph, all vertices except source have distance infinity
val initialGraph = gg.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
// call pregel on graph
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist),
triplet => {
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a,b) => math.min(a,b)
)
// routes , lowest flight cost
println(sssp.edges.take(4).mkString("\n"))
// routes with airport codes , lowest flight cost
ssp.edges.map{ case ( Edge(org_id, dest_id,price))=> ( (airportMap(org_id), airportMap(dest_id), price)) }.takeOrdered(10)(Ordering.by(_._3))
Array((WRG,PSG,51.55), (PSG,WRG,51.55), (CEC,ACV,52.8), (ACV,CEC,52.8), (ORD,MKE,53.35), (IMT,RHI,53.35), (MKE,ORD,53.35), (RHI,IMT,53.35), (STT,SJU,53.4), (SJU,STT,53.4))
// airports , lowest flight cost
println(sssp.vertices.take(4).mkString("\n"))