As of Spark 2.0.0 |
In the pre-Spark 2.0’s ear, SQLContext was the entry point for Spark SQL. Whatever you did in Spark SQL it had to start from creating an instance of SQLContext.
A SQLContext
object requires a SparkContext
, a CacheManager
, and a SQLListener. They are all transient
and do not participate in serializing a SQLContext.
You should use SQLContext
for the following:
You can create a SQLContext
using the following constructors:
SQLContext(sc: SparkContext)
SQLContext.getOrCreate(sc: SparkContext)
allows for creating a new instance ofSQLContext
with a separate SQL configuration (through a sharedSparkContext
You can set Spark SQL configuration properties using:
setConf(props: Properties): Unit
setConf(key: String, value: String): Unit
You can get the current value of a configuration property by key using:
getConf(key: String): String
getConf(key: String, defaultValue: String): String
getAllConfs: immutable.Map[String, String]
Properties that start with spark.sql are reserved for Spark SQL. |
emptyDataFrame: DataFrame
creates an empty DataFrame
. It calls createDataFrame
with an empty RDD[Row]
and an empty schema StructType(Nil).
createDataFrame[A <: Product](rdd: RDD[A]): DataFrame
createDataFrame[A <: Product](data: Seq[A]): DataFrame
family of methods can create a DataFrame
from an RDD
of Scala’s Product types like case classes or tuples or Seq
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
This variant of createDataFrame
creates a DataFrame
from RDD
of Row and explicit schema.
udf: UDFRegistration
method gives you access to UDFRegistration
to manipulate user-defined functions. Functions registered using udf
are available for Hive queries only.
Read up on UDFs in UDFs — User-Defined Functions document. |
// Create a DataFrame
val df = Seq("hello", "world!").zip(0 to 1).toDF("text", "id")
// Register the DataFrame as a temporary table in Hive
scala> sql("SHOW TABLES").show
| texts| true|
scala> sql("SELECT * FROM texts").show
| text| id|
| hello| 0|
|world!| 1|
// Just a Scala function
val my_upper: String => String = _.toUpperCase
// Register the function as UDF
spark.udf.register("my_upper", my_upper)
scala> sql("SELECT *, my_upper(text) AS MY_UPPER FROM texts").show
| text| id|MY_UPPER|
| hello| 0| HELLO|
|world!| 1| WORLD!|
isCached(tableName: String): Boolean
method asks CacheManager
whether tableName
table is cached in memory or not. It simply requests CacheManager
for CachedData
and when exists, it assumes the table is cached.
cacheTable(tableName: String): Unit
You can cache a table in memory using cacheTable
Why would I want to cache a table? |
uncacheTable(tableName: String)
clearCache(): Unit
and clearCache
remove one or all in-memory cached tables.
The implicits
object is a helper class with methods to convert objects into Datasets and DataFrames, and also comes with many Encoders for "primitive" types as well as the collections thereof.
Import the implicits by val spark = new SQLContext(sc)
import spark.implicits._ |
It holds Encoders for Scala "primitive" types like Int
, Double
, String
, and their collections.
It offers support for creating Dataset
from RDD
of any types (for which an encoder exists in scope), or case classes or tuples, and Seq
It also offers conversions from Scala’s Symbol
or $
to Column
It also offers conversions from RDD
or Seq
of Product
types (e.g. case classes or tuples) to DataFrame
. It has direct conversions from RDD
of Int
, Long
and String
to DataFrame
with a single column name _1
It is not possible to call toDF methods on RDD objects of other "primitive" types except Int , Long , and String .
createDataset[T: Encoder](data: Seq[T]): Dataset[T]
createDataset[T: Encoder](data: RDD[T]): Dataset[T]
family of methods creates a Dataset from a collection of elements of type T
, be it a regular Scala Seq
or Spark’s RDD
It requires that there is an encoder in scope.
Importing SQLContext.implicits brings many encoders available in scope. |
read: DataFrameReader
The experimental read
method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame
createExternalTable(tableName: String, path: String): DataFrame
createExternalTable(tableName: String, path: String, source: String): DataFrame
createExternalTable(tableName: String, source: String, options: Map[String, String]): DataFrame
createExternalTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
The experimental createExternalTable
family of methods is used to create an external table tableName
and return a corresponding DataFrame
FIXME What is an external table? |
It assumes parquet as the default data source format that you can change using spark.sql.sources.default setting.
dropTempTable(tableName: String): Unit
method drops a temporary table tableName
FIXME What is a temporary table? |
range(end: Long): Dataset[Long]
range(start: Long, end: Long): Dataset[Long]
range(start: Long, end: Long, step: Long): Dataset[Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]
The range
family of methods creates a Dataset[Long]
with the sole id
column of LongType
for given start
, end
, and step
The three first variants use SparkContext.defaultParallelism for the number of partitions numPartitions .
scala> spark.range(5)
res0: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> .show
| id|
| 0|
| 1|
| 2|
| 3|
| 4|
table(tableName: String): DataFrame
method creates a tableName
table and returns a corresponding DataFrame
tables(): DataFrame
tables(databaseName: String): DataFrame
methods return a DataFrame
that holds names of existing tables in a database.
| t| true|
| t2| true|
The schema consists of two columns - tableName
of StringType
and isTemporary
of BooleanType
tables is a result of SHOW TABLES [IN databaseName] .
tableNames(): Array[String]
tableNames(databaseName: String): Array[String]
are similar to tables
with the only difference that they return Array[String]
which is a collection of table names.
streams: StreamingQueryManager
The streams
method returns a StreamingQueryManager that is used to…TK
SQLContext.getOrCreate(sparkContext: SparkContext): SQLContext
method returns an active SQLContext
object for the JVM or creates a new one using a given sparkContext
It is a factory-like method that works on SQLContext class.
Interestingly, there are two helper methods to set and clear the active SQLContext
object - setActive
and clearActive
setActive(spark: SQLContext): Unit
clearActive(): Unit
sql(sqlText: String): DataFrame
executes the sqlText
SQL query.
It supports Hive statements through HiveContext. |
scala> sql("set spark.sql.hive.version").show(false)
16/04/10 15:19:36 INFO HiveSqlParser: Parsing command: set spark.sql.hive.version
|key |value|
scala> sql("describe database extended default").show(false)
16/04/10 15:21:14 INFO HiveSqlParser: Parsing command: describe database extended default
|Database Name |default |
|Description |Default Hive database |
|Location |file:/user/hive/warehouse |
|Properties | |
// Create temporary table
scala> spark.range(10).registerTempTable("t")
16/04/14 23:34:31 INFO HiveSqlParser: Parsing command: t
scala> sql("CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t")
16/04/14 23:34:38 INFO HiveSqlParser: Parsing command: CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t
| t| true|
| t2| true|
parses sqlText
using a dialect that can be set up using spark.sql.dialect setting.
You may also use spark-sql shell script to interact with Hive. |
Internally, it uses SessionState.sqlParser.parsePlan(sql)
method to create a LogicalPlan.
FIXME Review |
scala> sql("show tables").show(false)
16/04/09 13:05:32 INFO HiveSqlParser: Parsing command: show tables
|dafa |false |
Enable Add the following line to
Refer to Logging. |
newSession(): SQLContext
You can use newSession
method to create a new session without a cost of instantiating a new SqlContext from scratch.
returns a new SqlContext
that shares SparkContext
, CacheManager
, SQLListener, and ExternalCatalog.
FIXME Why would I need that? |