apache spark - إزالة التكرارات من الصفوف بناءً على أعمدة محددة في RDD/Spark DataFrame




apache-spark apache-spark-sql (5)

Pyspark يتضمن أسلوب dropDuplicates() . https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

ربما تم تقديمه في إصدار أحدث مما كان يستخدمه @ Jason (OP)؟

تحرير: نعم تم تقديمه في 1.4

لنفترض أن لدي مجموعة بيانات كبيرة نسبيًا في النموذج التالي:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])

ما أريد فعله هو إزالة الصفوف المكررة استنادًا إلى قيم الأعمدة الأولى والثالثة والرابعة فقط.

إزالة الصفوف المكررة تمامًا أمر بسيط:

data = data.distinct()

وستتم إزالة الصف 5 أو الصف 6

ولكن كيف يمكنني إزالة الصفوف المكررة فقط بناءً على الأعمدة 1 و 3 و 4 فقط؟ أي إزالة واحد من هؤلاء:

('Baz',22,'US',6)
('Baz',36,'US',6)

في Python ، يمكن القيام بذلك عن طريق تحديد الأعمدة ذات .drop_duplicates() . كيف يمكنني تحقيق نفس الشيء في Spark / Pyspark؟


أتفق مع ديفيد. للإضافة ، قد لا تكون الحالة هي أننا نريد تجميع كل الأعمدة بخلاف العمود (الأعمدة) في دالة تجميعية ، إذا أردنا إزالة التكرارات استنادًا إلى مجموعة فرعية من الأعمدة والاحتفاظ بكافة الأعمدة في مخطط البيانات الأصلي . لذلك يمكن أن يكون أفضل طريقة للقيام بذلك استخدام dropDuplicates Dataframe api في Spark 1.4.0

للرجوع إليها ، راجع: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame


أنا استخدم inbuilt وظيفة dropDuplicates (). كود سكالا الواردة أدناه

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")

data.dropDuplicates(Array("x","count")).show()

انتاج :

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+

من سؤالك ، من غير الواضح ما هي الأعمدة التي تريد استخدامها لتحديد التكرارات. الفكرة العامة وراء الحل هي إنشاء مفتاح يستند إلى قيم الأعمدة التي تحدد التكرارات. ثم ، يمكنك استخدام reduByKey أو تقليل العمليات لإزالة التكرارات.

إليك بعض التعليمات البرمجية التي تساعدك على البدء:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))

الآن ، لديك قيمة RDD ذات قيمة reduceByKey groupByKey بواسطة الأعمدة 1.3 و 4. ستكون الخطوة التالية إما reduceByKey أو groupByKey و filter . هذا من شأنه القضاء على التكرارات.

r = m.reduceByKey(lambda x,y: (x))

سيساعدك البرنامج أدناه في إسقاط التكرارات بالكامل ، أو إذا كنت تريد إسقاط التكرارات استنادًا إلى أعمدة معينة ، يمكنك حتى القيام بذلك:

import org.apache.spark.sql.SparkSession

object DropDuplicates {
def main(args: Array[String]) {
val spark =
  SparkSession.builder()
    .appName("DataFrame-DropDuplicates")
    .master("local[4]")
    .getOrCreate()

import spark.implicits._

// create an RDD of tuples with some data
val custs = Seq(
  (1, "Widget Co", 120000.00, 0.00, "AZ"),
  (2, "Acme Widgets", 410500.00, 500.00, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (4, "Widgets R Us", 410500.00, 0.0, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
  (6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)

// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")

println("*** Here's the whole DataFrame with duplicates")

customerDF.printSchema()

customerDF.show()

// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()

println("*** Now without duplicates")

withoutDuplicates.show()

// drop fully identical rows
val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))

println("*** Now without partial duplicates too")

withoutPartials.show()

 }
 }




pyspark