tutorial - spark version scala




Как определить, есть ли в Spark DataFrame столбец (6)

Для тех, кто наткнулся на это в поисках решения Python, я использую:

if 'column_name_to_check' in df.columns:
    # do something

Когда я попробовал ответ @Jai Prakash на df.columns.contains('column-name-to-check') с использованием Python, я получил AttributeError: 'list' object has no attribute 'contains' .

Когда я создаю DataFrame из файла JSON в Spark SQL, как я могу узнать, существует ли данный столбец перед вызовом .select

Пример схемы JSON:

{
  "a": {
    "b": 1,
    "c": 2
  }
}

Вот что я хочу сделать:

potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))

но я не могу найти хорошую функцию для hasColumn . Самое близкое, что я получил, это проверить, находится ли столбец в этом несколько неудобном массиве:

scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)

Другой вариант для этого - сделать некоторые манипуляции с массивами (в данном случае intersect ) для df.columns и ваших potential_columns .

val potential_columns = Seq("a.b", "a.c", "a.d")

// Our object model
case class Document( a: String, b: String, c: String)
case class Document2( a: Document, b: String, c: String)

// And some data...
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF

// We go through each of the fields in the schema.
// For StructTypes we return an array of parentName.fieldName
// For everything else we return an array containing just the field name
// We then flatten the complete list of field names
// Then we intersect that with our potential_columns leaving us just a list of column we want
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show

Увы, это не будет работать для вас сценарий внутреннего объекта выше. Вам нужно будет посмотреть на схему для этого.

Я собираюсь изменить ваши potential_columns столбцы на полные имена столбцов

df.schema.fieldNames.contains("column_name")

Это только один уровень глубины, так что, чтобы сделать его общим, вам придется сделать больше работы.


Если вы загружаете json, используя определение схемы при загрузке, вам не нужно проверять столбец. если его нет в исходном коде json, он будет отображаться как пустой столбец

        val schemaJson = """
  {
      "type": "struct",
      "fields": [
          {
            "name": field1
            "type": "string",
            "nullable": true,
            "metadata": {}
          },
          {
            "name": field2
            "type": "string",
            "nullable": true,
            "metadata": {}
          }
      ]
  }
        """
    val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]

    val djson = sqlContext.read
    .schema(schema )
    .option("badRecordsPath", readExceptionPath)
    .json(dataPath)

На самом деле вам даже не нужно вызывать select для использования столбцов, вы можете просто вызвать его на самом фрейме данных

// define test data
case class Test(a: Int, b: Int)
val testList = List(Test(1,2), Test(3,4))
val testDF = sqlContext.createDataFrame(testList)

// define the hasColumn function
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)

// then you can just use it on the DF with a given column name
hasColumn(testDF, "a")  // <-- true
hasColumn(testDF, "c")  // <-- false

В качестве альтернативы вы можете определить неявный класс, используя шаблон pimp my library, чтобы метод hasColumn был доступен непосредственно на ваших фреймах данных

implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
    def hasColumn(colName: String) = df.columns.contains(colName)
}

Тогда вы можете использовать его как:

testDF.hasColumn("a") // <-- true
testDF.hasColumn("c") // <-- false

Try не является оптимальным, поскольку он будет оценивать выражение внутри Try прежде чем принять решение.

Для больших наборов данных используйте ниже в Scala :

// Loading some data (so you can just copy & paste right into spark-shell)
case class Document( a: String, b: String, c: String)
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF

// The columns we want to extract
val potential_columns = Seq("b", "c", "d")

// Get the intersect of the potential columns and the actual columns, 
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show

def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) =
  Try(df.select(colName)).isSuccess

Используйте вышеупомянутую функцию, чтобы проверить существование столбца, включая имя вложенного столбца.







apache-spark-sql