python multiple Transpose column to row with Spark



spark unpivot multiple columns (4)

I'm trying to transpose some columns of my table to row. I'm using Python and Spark 1.5.0. Here is my initial table:

+-----+-----+-----+-------+
|  A  |col_1|col_2|col_...|
+-----+-------------------+
|  1  |  0.0|  0.6|  ...  |
|  2  |  0.6|  0.7|  ...  |
|  3  |  0.5|  0.9|  ...  |
|  ...|  ...|  ...|  ...  |

I would like to have somthing like this:

+-----+--------+-----------+
|  A  | col_id | col_value |
+-----+--------+-----------+
|  1  |   col_1|        0.0|
|  1  |   col_2|        0.6|   
|  ...|     ...|        ...|    
|  2  |   col_1|        0.6|
|  2  |   col_2|        0.7| 
|  ...|     ...|        ...|  
|  3  |   col_1|        0.5|
|  3  |   col_2|        0.9|
|  ...|     ...|        ...|

Does someone know haw I can do it? Thank you for your help.


It is relatively simple to do with basic Spark SQL functions.

Python

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])

Scala:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
  require(types.distinct.size == 1)      

  val kvs = explode(array(
    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
  ))

  val byExprs = by.map(col(_))

  df
    .select(byExprs :+ kvs.alias("_kvs"): _*)
    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}

toLong(df, Seq("A"))

Use flatmap. Something like below should work

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID' = k, 'colValue' : row[k]})

newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))

Transform data in spark scala column to row

Assuming that your input dataset is generated using a case class as

case class infos(CustomerID: Int, CustomerName: String, Sun: Int, Mon: Int, Tue: Int)

For testing purpose I am creating a dataset as

import sqlContext.implicits._
val ds = Seq(
  infos(1, "ABC", 0, 12, 10),
  infos(2, "DEF", 10, 0, 0)
).toDS

which should give your input dataset

+----------+------------+---+---+---+
|CustomerID|CustomerName|Sun|Mon|Tue|
+----------+------------+---+---+---+
|1         |ABC         |0  |12 |10 |
|2         |DEF         |10 |0  |0  |
+----------+------------+---+---+---+

Getting your final required dataset requires you to create another case class as

case class finalInfos(CustomerID: Int, CustomerName: String, Day: String, Value: Int)

Final required dataset can be achieved by doing the following

val names = ds.schema.fieldNames

ds.flatMap(row => Array(finalInfos(row.CustomerID, row.CustomerName, names(2), row.Sun),
  finalInfos(row.CustomerID, row.CustomerName, names(3), row.Mon),
  finalInfos(row.CustomerID, row.CustomerName, names(4), row.Tue)))

which should give you dataset as

+----------+------------+---+-----+
|CustomerID|CustomerName|Day|Value|
+----------+------------+---+-----+
|1         |ABC         |Sun|0    |
|1         |ABC         |Mon|12   |
|1         |ABC         |Tue|10   |
|2         |DEF         |Sun|10   |
|2         |DEF         |Mon|0    |
|2         |DEF         |Tue|0    |
+----------+------------+---+-----+

The Spark local linear algebra libraries are presently very weak: and they do not include basic operations as the above.

There is a JIRA for fixing this for Spark 2.1 - but that will not help you today.

Something to consider: performing a transpose will likely require completely shuffling the data.

For now you will need to write RDD code directly. I have written transpose in scala - but not in python. Here is the scala version:

 def transpose(mat: DMatrix) = {
    val nCols = mat(0).length
    val matT = mat
      .flatten
      .zipWithIndex
      .groupBy {
      _._2 % nCols
    }
      .toSeq.sortBy {
      _._1
    }
      .map(_._2)
      .map(_.map(_._1))
      .toArray
    matT
  }

So you can convert that to python for your use. I do not have bandwidth to write/test that at this particular moment: let me know if you were unable to do that conversion.

At the least - the following are readily converted to python.

  • zipWithIndex --> enumerate() (python equivalent - credit to @zero323)
  • map --> [someOperation(x) for x in ..]
  • groupBy --> itertools.groupBy()

Here is the implementation for flatten which does not have a python equivalent:

  def flatten(L):
        for item in L:
            try:
                for i in flatten(item):
                    yield i
            except TypeError:
                yield item

So you should be able to put those together for a solution.





transpose