RDD (k1, k2) bilan RDD (kalit, id)

Menga o'xshash ko'rinadigan ma'lumotlar bilan original RDD bor:

(A,A)
(A,B)
(B,C)
(C,D)

Ular bir grafada qirralar (vertex nomlari sifatida ifodalanadi.) Ikkinchidan, bir nechta identifikatorlari bilan RDD yaratish uchun kod ishlatiladi.

 (A,0)
 (B,41)
 (C,82)
 (D,123)

Men bu RDDlarni bir xil tarzda yakuniy RDD olish uchun birlashtirmoqchiman:

Edge(0,0,AA)
Edge(0,41,AB)
Edge(41,82,BC)
Edge(82,123,CD)

Asosan, Edge [RDD] yaratish, shuning uchun men bu chekkada grafik chizmalarni foydalanishim mumkin. Id RDD'sini asl chekka RDD bilan birlashtirish mumkinmi?

0
Quyidagi har qanday yordam?
qo'shib qo'ydi muallif mtoto, manba
@mtoto Men bir nechta backend kodini qayta yozishga majbur bo'ldim, shuning uchun test qilolmadim. @Semsorock yaxshi yechimga ega bo'lishi mumkin deb o'ylayman. Men hozirda sof RDD-lardan foydalanishni va har qanday narsani qanday qilib xohlaganimni aniqlash uchun join ni ishlatishni o'ylayman.
qo'shib qo'ydi muallif Dylan Lawrence, manba
@mtoto Men bir nechta backend kodini qayta yozishga majbur bo'ldim, shuning uchun test qilolmadim. @Semsorock yaxshi yechimga ega bo'lishi mumkin deb o'ylayman. Men hozirda sof RDD-lardan foydalanishni va har qanday narsani qanday qilib xohlaganimni aniqlash uchun join ni ishlatishni o'ylayman.
qo'shib qo'ydi muallif Dylan Lawrence, manba
@mtoto Men bir nechta backend kodini qayta yozishga majbur bo'ldim, shuning uchun test qilolmadim. @Semsorock yaxshi yechimga ega bo'lishi mumkin deb o'ylayman. Men hozirda sof RDD-lardan foydalanishni va har qanday narsani qanday qilib xohlaganimni aniqlash uchun join ni ishlatishni o'ylayman.
qo'shib qo'ydi muallif Dylan Lawrence, manba

6 javoblar

Agar id kodi rdd juda katta bo'lmasa, uni rdd kenarınızı yaratishingiz mumkin:

// Create broadcast variable from id _rdd
val bc_lookup = sc.broadcast(rdd_id.collectAsMap())

// Create lookup function that returns intermediate rdd
def lookup_custom(x: (String, String)): (Int,Int,String) = {
   (bc_lookup.value.get(x._1).get, 
    bc_lookup.value.get(x._2).get,
    x._1 + x._2)
}

val rdd_result = my_rdd.map(x => lookup_custom(x)).cache()

// Convert to Edge RDD
val e_rdd = rdd_result.map(x => Edge(x._1, x._2, x._3))

e_rdd.collect()
// res1: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(0,0,AA), Edge(0,41,AB), Edge(41,82,BC), Edge(82,123,CD))

Ma'lumotlarni

val my_rdd = sc.parallelize(Seq(("A","A"),("A","B"),("B","C"),("C", "D")))
val rdd_id = sc.parallelize(Seq(("A",0),("B",41),("C",82),("D",123)))
0
qo'shib qo'ydi

Agar id kodi rdd juda katta bo'lmasa, uni rdd kenarınızı yaratishingiz mumkin:

// Create broadcast variable from id _rdd
val bc_lookup = sc.broadcast(rdd_id.collectAsMap())

// Create lookup function that returns intermediate rdd
def lookup_custom(x: (String, String)): (Int,Int,String) = {
   (bc_lookup.value.get(x._1).get, 
    bc_lookup.value.get(x._2).get,
    x._1 + x._2)
}

val rdd_result = my_rdd.map(x => lookup_custom(x)).cache()

// Convert to Edge RDD
val e_rdd = rdd_result.map(x => Edge(x._1, x._2, x._3))

e_rdd.collect()
// res1: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(0,0,AA), Edge(0,41,AB), Edge(41,82,BC), Edge(82,123,CD))

Ma'lumotlarni

val my_rdd = sc.parallelize(Seq(("A","A"),("A","B"),("B","C"),("C", "D")))
val rdd_id = sc.parallelize(Seq(("A",0),("B",41),("C",82),("D",123)))
0
qo'shib qo'ydi

Agar id kodi rdd juda katta bo'lmasa, uni rdd kenarınızı yaratishingiz mumkin:

// Create broadcast variable from id _rdd
val bc_lookup = sc.broadcast(rdd_id.collectAsMap())

// Create lookup function that returns intermediate rdd
def lookup_custom(x: (String, String)): (Int,Int,String) = {
   (bc_lookup.value.get(x._1).get, 
    bc_lookup.value.get(x._2).get,
    x._1 + x._2)
}

val rdd_result = my_rdd.map(x => lookup_custom(x)).cache()

// Convert to Edge RDD
val e_rdd = rdd_result.map(x => Edge(x._1, x._2, x._3))

e_rdd.collect()
// res1: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(0,0,AA), Edge(0,41,AB), Edge(41,82,BC), Edge(82,123,CD))

Ma'lumotlarni

val my_rdd = sc.parallelize(Seq(("A","A"),("A","B"),("B","C"),("C", "D")))
val rdd_id = sc.parallelize(Seq(("A",0),("B",41),("C",82),("D",123)))
0
qo'shib qo'ydi

Siz shunday bir narsani sinashingiz mumkin:

val df1 = rdd1.toDF("col1", "col2")
val df2 = rdd2.toDF("col", "val")

df1.join(df2, $"col1" === $"col").drop(col("col")).join(df2, $"col2" === $"col").drop(col("col")).show

+----+----+---+---+
|col1|col2|val|val|
+----+----+---+---+
|   A|   B|  0| 41|
|   C|   D| 82|123|
|   B|   C| 41| 82|
|   A|   A|  0|  0|
+----+----+---+---+
0
qo'shib qo'ydi

Siz shunday bir narsani sinashingiz mumkin:

val df1 = rdd1.toDF("col1", "col2")
val df2 = rdd2.toDF("col", "val")

df1.join(df2, $"col1" === $"col").drop(col("col")).join(df2, $"col2" === $"col").drop(col("col")).show

+----+----+---+---+
|col1|col2|val|val|
+----+----+---+---+
|   A|   B|  0| 41|
|   C|   D| 82|123|
|   B|   C| 41| 82|
|   A|   A|  0|  0|
+----+----+---+---+
0
qo'shib qo'ydi

Siz shunday bir narsani sinashingiz mumkin:

val df1 = rdd1.toDF("col1", "col2")
val df2 = rdd2.toDF("col", "val")

df1.join(df2, $"col1" === $"col").drop(col("col")).join(df2, $"col2" === $"col").drop(col("col")).show

+----+----+---+---+
|col1|col2|val|val|
+----+----+---+---+
|   A|   B|  0| 41|
|   C|   D| 82|123|
|   B|   C| 41| 82|
|   A|   A|  0|  0|
+----+----+---+---+
0
qo'shib qo'ydi