Tuesday, August 25, 2015

Spark Dataframes: All you need to know to rewrite your Hive/Pig scripts to spark DF

In this blog post, I am going to talk about how Spark DataFrames can potentially replace hive/pig in big data space. For an introduction on DataFrames, please read this blog post by DataBricks. The reason to write this blog is to share more advanced information on this topic that I could not find anywhere and had to learn myself. The first part of the blog consists of how to port hive queries to Spark DataFrames, the second part discusses the performance tips for DataFrames. In general, Spark DataFrames are quite efficient in terms of performance as shown in Fig. 1 and since either python/java/scala can be used to write them, it gives a lot of flexibility and control to write jobs efficiently.
The first version of Spark that I used was Spark 1.0 and it was quite unstable and had many bugs so after some POC I gave up and decided not to use spark at all. But then I tried Spark again with Spark1.3 and found that it has improved a lot in terms of performance and stability. Since my team uses hive/pig for most of their data related jobs, I started using data frames which are recently introduced in Spark. I found Spark1.4 to be quite stable and good for your production jobs.
Through the new DataFrame API, Python programs can achieve the same level of performance as JVM programs because the Catalyst optimizer compiles DataFrame operations into JVM bytecode. Indeed, performance sometimes beats hand-written Scala code. I tried python DF and scala DF and found no performance differences. Both python DF and scala DF performed equally in terms of performance.
Spark dataframes performance
Fig. 1 : This figure shows the performance of Spark DFs as compared to RDDs on 10 million integers. More details can be found here.
NoteOne thing that I have noticed is that it is still a  good idea to do the first pass of heavy aggregation in Hive/Pig and then use DF on the aggregated data. I tried to do the aggregation on huge amount of data using DF but did not find much performance gain (infact in some cases found performance degradation ). But there are ways to even do heavy data aggregation using DF which i will talk about in next chapter (will soon add the next chapter on efficient aggregation using DF)
Below are the ways to rewrite hive/pig SQL to Spark DataFrames:
Suppose we have a USER table as described here:
Create table USERS(
key int,
value int )
partitions by (dateint int)
 
A. Hive Query:
SELECT KEY, VALUE FROM USER_TABLE WHERE KEY = 10 AND VALUE = 100 LIMIT 10;
Equivalent in Spark Dataframe:
            val hqlContext = new HiveContext(sc) 
val sqlQuery = "select key, value from user_table where key = 10 and value =10 " 
hqlContext.sql(sqlQuery).take(10).foreach(println)
This is how we can do data filters in data frame (instead of having in where clause in query)
val hqlContext = new HiveContext(sc) 
val sqlQuery = "select key, value from user_table " 
val queryResult = hqlContext.sql(sqlQuery)
queryResult.filter(queryResult("key") = 10 && queryResult("value")  = 10 ).foreach(println)

B. Selecting latest dateint  partition from the above mentioned table table.
Hive query:
select max(dateint) from users;
But if each partition contains huge amount of data then this simple query takes a lot of time to return results. This is how you can do the same using spark data frame efficiently
val sqlQuery = "show partitions users"
val datePart = hqlContext.sql(sqlQuery)
val maxDateInt = datePart.map(row => row.getAs[String](0))
.filter(row => row contains("dateint"))
.map(row => row.split("/")(0))
.map(row => row.split("=")(1).toInt).max()

C. Column renaming 
This is how columns can be renamed in data frames
Hive Query:
select key as my_key , value as my_value from users limit 10;
In data frames:
val sqlQuery = "select key, value  from users"
val data = hqlContext.sql(sqlQuery)
val colRename = data.select(data("key") as "my_key", data("values") as "my_values").take(10).foreach(println)

D. Case statements
This is how case statements can be written in DataFrames :
Hive Query:
select key, case when value is null then 0 else value as new_value from users limit 10;
In data frames:
val sqlQuery = "select key, value from users"
val data = hqlContext.sql(sqlQuery)
val caseStatement=data.withColumn("new_value", when(data("value") isNull, 0.0).otherwise(data("value"))).take(10)
Now here withColumn function creates one more column other than key and value So if we do printSchema() on caseStatement object then it will return:
caseStatement.printSchema()
----key: int
----value: int
----new_value:int

E. JOINS

E.1: Inner Join
select a.key,b.dept_id from users a join department b on a.key = b.key and a.user_id = b.user_id limit 10;
In data frames:
val sqlQuery_a = "select key, values, user_id from users"
val data_a = hqlContext.sql(sqlQuery_a)
val dataCol_a = data_a.select(data_a("key") as "my_key", data_a("values") as "my_values", data_a("user_id") as "my_user" )

val sqlQuery_b = "select key, depart_id, user_id from department"
val data_b = hqlContext.sql(sqlQuery_b)
val dataCol_b = datePart.select(data_b("key") as "my_key", data_b("depart_id") as "my_depart", data_b("user_id") as "my_user" )

//default is "inner join"
val joinData = dataCol_a.join(dataCol_b, dataCol_a("my_key") <=> dataCol_b("my_key") && dataCol_a("my_user") <=> dataCol_b("my_user"))
.select(dataCol_a("my_key") as "key",dataCol_b("my_depart") as "dept_id").take(10)

E.2 Left Outer Join
In Hive
select a.key,b.dept_id from users a join department b on a.key = b.key and a.user_id = b.user_id limit 10;
In DataFrames:
val joinData = dataCol_a.join(dataCol_b, dataCol_a("my_key") <=> dataCol_b("my_key") && dataCol_a("my_user") <=> dataCol_b("my_user"), "left_outer")
.select(dataCol_a("my_key") as "key",dataCol_b("my_depart") as "dept_id").take(10)

F. Aggregations
There are different ways of aggregating data using Spark DataFrames. But here I show the optimized functions that can be used for aggregations.
I would recommend to not use reduceBy as it is quite inefficient and slow. Instead, I would recommend using reduceByKey/ MapValues or groupBy along with agg() in Spark 1.4. Both are optimized for performance.
val sqlQuery = "select key, value from users"
val data = hqlContext.sql(sqlQuery)
val dataAgg = data.select(data("key") as "my_key", data("values") as "my_values").reduceByKey((a, b) => a + b)
 
//If the values are Vector Array or List then
val dataAgg = data.select(data("key") as "my_key", data("values") as "my_values").reduceByKey((a, b) => a ++ b)
 
OR with Spark 4.1, the same can be done by:
val dataAgg = data.select(data("key") as "my_key", data("values") as "my_values")
.agg(data("my_key") , sum(data("values"))) //You can use max/min/avg etc
Similarly, for data mapping, whenever possible mapValues(..) should be used instead of map(..) functionas with Spark 1.4, this function has been well optimized.

G. Read/Write
G.1 Writing to a file
If you are reading or writing to/from Parquet/JSON files, then Read/Write has been improved a lot and very stable with no or very little known issues with Spark 1.4.
Here are few example to write output to parquet files.
//Data frame final output can be written as -
write.mode(SaveMode.Overwrite).parquet(outputDirWithPartition)
//You can choose whatever way you want to save like append/overwrite/ ignore etc.

OR for Spark 1.3 or before:
.save(path='myPath', source='parquet', mode='overwrite')

//To save as text file
. saveAsTextFile("outPutPath")

G.2 Saving to Persistent Tables
When working with a HiveContextDataFrames can also be saved as persistent tables using the saveAsTable command. Unlike the registerTempTable command, saveAsTable will materialize the contents of the DataFrame and create a pointer to the data in the HiveMetastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SQLContext with the name of the table.
By default saveAsTable will create a “managed table”, meaning that the location of the data will be controlled by the metastore. Managed tables will also have their data deleted automatically when a table is dropped.

G.3 Reading from file
Data frame objects can easily be created when reading from file and all the above mentioned concepts can be applied directly.
           import sqlContext.implicits._
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Reading from text file and Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

//This is how you can directly read from parquet/json which automatically returns the data in data frame object
//Parquet
val people = sqlContext.read.parquet("people.parquet")
//Json
val people = sqlContext.read.json("people.json")
         //Now register your data object as table in sqlContext
people.registerTempTable("people")
        // SQL statements can be run by using the sql methods provided by sqlContext.
         val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND
                                                             age <= 19")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)

H. Performance tips
1. If the number of partitions are large but the data per partition is small then reduce the total number of tasks by coalesce(10)
to run your job fast and effeciently.
2. Try to process your data as much as possible using data frames but if something cannot be done in data frames the you can convert to dataframe object to rdd by toRDD function and then perform all your rdd opetations ( similarly you can convert back your rdd to data frames by toDF).
3. If you want to use the same dataset (rdd/df) agan and again at several steps then you can persist that dataset instead of making spark job to read that dataset again to reduce the I/O.
DF.persist() would persist that dataset in memory. Also one key point to remember here is that before you persist any object,
filter the data as much as possible upfront before persisting it in memory.
4. Use the broadcast variable very carefully as in distributed system, they may produce incorrect results (may do more damage) so dont use them unless there is no other way.
5. For aggregate and reduce operations, if the no. of partitions are large then please use TreeReduce and TreeAggregate aggregate/reduce operation rather than reduceByKey/aggByKey (Also, reduceByKey/aggByKey only works on key RDD while TreeReduce and TreeAggregate works on any kind of RDD and internally calls reduceByKey/aggByKey ) . 

I. UDF's
Creating and applying udf's has never been so easy.
Here is the example of map side UDF.
--UDF to convert "2015-08-17" to 20150817 (string to int)
val datetoDateInt = udf((dateString: String) => {
dateString.replace("-", "").toInt
})
Reduce side (or aggregate) UDF creation and impletation is still little bit complicated in Spark dataframe so i am skipping it here to avoid any confusion.

J. Debugging tips
I would recommend putting code in main(..) block (in case of scala/Java) otherwise the jar file will either throw error at runtime or the variables that are being used in the code may not get broadcast correctly across the stages.

K. How to run ML algorithms on Spark using Spark ML libs and Dataframes:

Running ML algos on Spark is little bit tricky so here are some of the pointers to run them effectively on Spark:
a. Make sure you select correct partition key to partition your data so that ML algorithm will run in a distributed fashion on a partitioned data.
b. Please select the feature vector effectively i.e. make sure you choose the number of features that fits in a memory rather than large number of feature that does not fit in a memory well.
c. If the number of feature is a lot then try to convert them in integers(if possible ) rather than string or other datatype 

K.1 : K- Mean clustering:
This is how you run K-Mean clustering using Dataframes in Spark.

These are the libs that you need to import:
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
import scala.sys.process._



//Pull data from hive or any other DB ( for this example we assume that the data is stored in hive )val query = "select query, impression from table_search"
//Now execute the queryval q1 = hqlContext.sql(query).select("query", "page_impression")
val rowRDD = q1.rdd.map(r => (r.getString(0), r.getLong(1), Vectors.dense(r.getLong(1))))

rowRDD.cache()

val upper_cnt =  8000val lower_cnt = 100
//select the training set
val trainSet = rowRDD.filter(r => r._2 < upper_cnt && r._2 >= lower_cnt)
val numClusters = 3val numIterations = 20
val feature = trainSet.map(r => r._3)

val kMeansModel = KMeans.train(feature, numClusters, numIterations)
//Here we are segregating all the queries in 3 seperate clusters
val predDF = trainSet.map(r => kMeansModel.predict(r._3) match {
  case 1 => (r._1, r._2, 0)
  case 2 => (r._1, r._2, 1)
  case 0 => (r._1, r._2, 2)
  case _ => (r._1, r._2, -1)
}).toDF("queries", "page_impression", "tier")

//Now saving the data in a hdfspredDF.coalesce(10).write.mode(SaveMode.Overwrite).parquet(final_part_dir)
K.2: Logistic regression example:


val data = Seq(  (1.0, .52,0.34),  (0.0, 0.6, 0.43),  (0.0, 1.9, 0.54),  (1.0, 0.11, 0.11),  (1.0, 0.222, 0.33),  (1.0, 0.333, 0.99),  (0.0, 0.314, 0.86),  (0.0, 0.9888, 0.34),  (1.0, 0.264, 0.55))

val df = sc.parallelize(data).toDF("label", "x1", "x2").cache()

val train_set = df.rdd.map(row => LabeledPoint(row.getAs[Double](0), Vectors.dense (row.getAs[Double](1), row.getAs[Double](2)))).toDF

train_set.show()

val lr =new LogisticRegression().setMaxIter(10).setRegParam(0.3)

val lrModel = lr.fit(train_set)

//Prepare the test data
val test = df.select ("x1", "x2").rdd.map(row => Feature(Vectors.dense(row.getAs[Double](0), row.getAs[Double](1) ) ) ).toDF("features")

val predictionsDF = lrModel.transform (test)
predictionsDF.show()

Topics for upcoming blog posts:
  1. Generic ETL jobs/templates that can be re-used by multiple teams instead of writing the similar job again.
  2. Aggregation on huge amount of data using spark DF
Please stay tuned and thanks for reading !


For flattening the data:
http://xinhstechblog.blogspot.com/2015/06/reading-json-data-in-spark-dataframes.html
Further on how to save DF to tables:
http://deepikakhera.blogspot.com/2015/09/spark-rdd-to-hive-dataframe.html