scala - How can I get the current SparkSession in any place of the codes? - TagMerge
6How can I get the current SparkSession in any place of the codes?How can I get the current SparkSession in any place of the codes?

How can I get the current SparkSession in any place of the codes?

Asked 1 years ago
29
6 answers

Once a session was created (anywhere), you can safely use:

SparkSession.builder.getOrCreate()

To get the (same) session anywhere in the code, as long as the session is still alive. Spark maintains a single active session so unless it was stopped or crashed, you'll get the same one.

Edit: builder is not callable, as mentioned in the comments.

Source: link

4

Since 2.2.0 you can access the active SparkSession through:

/**
 * Returns the active SparkSession for the current thread, returned by the builder.
 *
 * @since 2.2.0
 */
def getActiveSession: Option[SparkSession] = Option(activeThreadSession.get)

or default SparkSession:

/**
 * Returns the default SparkSession that is returned by the builder.
 *
 * @since 2.2.0
 */
def getDefaultSparkSession: Option[SparkSession] = Option(defaultSession.get)

Source: link

3

When SparkSession variable has been defined as

val sparkSession = SparkSession.builder.master("local[*]").appName("Simple Application").getOrCreate()

This variable is going to point/refer to only one SparkSession as its a val. And you can always pass to different classes for them to access as well as

val newClassCall = new NewClass(sparkSession)

Now you can use the same sparkSession in that new class as well.

Source: link

0

Once a session was created (anywhere), you can safely use:
SparkSession.builder.getOrCreate()
Since 2.2.0 you can access the active SparkSession through:
/**
 * Returns the active SparkSession for the current thread, returned by the builder.
 *
 * @since 2.2.0
 */
def getActiveSession: Option[SparkSession] = Option(activeThreadSession.get)
or default SparkSession:
/**
 * Returns the default SparkSession that is returned by the builder.
 *
 * @since 2.2.0
 */
def getDefaultSparkSession: Option[SparkSession] = Option(defaultSession.get)
When SparkSession variable has been defined as
val sparkSession = SparkSession.builder.master("local[*]").appName("Simple Application").getOrCreate()
This variable is going to point/refer to only one SparkSession as its a val. And you can always pass to different classes for them to access as well as
val newClassCall = new NewClass(sparkSession)

Source: link

0

I have created a session in the main() function, like this:
val sparkSession = SparkSession.builder.master("local[*]").appName("Simple Application").getOrCreate()
Now if I want to configure the application or access the properties, I can use the local variable sparkSession in the same function.
What if I want to access this sparkSession elsewhere in the same project, like project/module/.../.../xxx.scala. What should I do?
I have created a session in the main() function, like this: val sparkSession = SparkSession.builder.master("local[*]").appName("Simple Application").getOrCreate() Now if I want to configure the application or access the properties, I can use the local variable sparkSession in the same function. What if I want to access this sparkSession elsewhere in the same project, like project/module/.../.../xxx.scala. What should I do?
Once a session was created (anywhere), you can safely use:
SparkSession.builder().getOrCreate()
To get the (same) session anywhere in the code, as long as the session is still alive. Spark maintains a single active session so unless it was stopped or crashed, you'll get the same one.
Once a session was created (anywhere), you can safely use: SparkSession.builder().getOrCreate() To get the (same) session anywhere in the code, as long as the session is still alive. Spark maintains a single active session so unless it was stopped or crashed, you'll get the same one.
When SparkSession variable has been defined as
val sparkSession = SparkSession.builder.master("local[*]").appName("Simple Application").getOrCreate()
This variable is going to point/refer to only one SparkSession as its a val. And you can always pass to different classes for them to access as well as
val newClassCall = new NewClass(sparkSession)
Now you can use the same sparkSession in that new class as well.
When SparkSession variable has been defined as val sparkSession = SparkSession.builder.master("local[*]").appName("Simple Application").getOrCreate() This variable is going to point/refer to only one SparkSession as its a val. And you can always pass to different classes for them to access as well as val newClassCall = new NewClass(sparkSession) Now you can use the same sparkSession in that new class as well.
Since 2.2.0 you can access the active SparkSession through:
/**
* Returns the active SparkSession for the current thread, returned by the builder.
*
* #since 2.2.0
*/
def getActiveSession: Option[SparkSession] = Option(activeThreadSession.get)
or default SparkSession:
/**
* Returns the default SparkSession that is returned by the builder.
*
* #since 2.2.0
*/
def getDefaultSparkSession: Option[SparkSession] = Option(defaultSession.get)
How to register kryo classes in the spark-shell
The SparkConf has the method registerKryoClasses:
def registerKryoClasses(classes: Array[Class[_]]): SparkConf = { .. }
However it is not available/exposed in the RuntimeConfiguration facade provided by the SparkSession.conf() attribute
#transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf)
Here is more about the RuntimeConfiguration:
/**
* Runtime configuration interface for Spark. To access this, use `SparkSession.conf`.
*
* Options set here are automatically propagated to the Hadoop configuration during I/O.
*
* #since 2.0.0
*/
#InterfaceStability.Stable
class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
There is a clear workaround for this when creating our own SparkSession: we can invoke the set(key,value) on the SparkConf that is provided to the
val mysparkConf = SparkConf.set(someKey,someVal)
SparkSession.builder.conf(mySparkConf)
But when running the Spark shell the sparkSession/sparkContext are already created. So then how can the non-runtime settings be put into effect?
The particular need here is :
sparkConf.registerKryoClasses(Array(classOf[org.apache.spark.sql.Row]))
When attempting to set that on the SqlConf available to the spark session object We get this exception:
scala> spark.conf.registerKryoClasses(Array(classOf[Row]))
error: value registerKryoClasses is not a member of org.apache.spark.sql.RuntimeConfig
spark.conf.registerKryoClasses(Array(classOf[Row]))
So then how can the kryo serializers be registered in the spark-shell ?

Source: link

0

I have created a session in the main() function, like this:
val sparkSession = SparkSession.builder.master("local[*]").appName("Simple Application").getOrCreate()
Once a session was created (anywhere), you can safely use:
SparkSession.builder().getOrCreate()
Since Spark 3.0 you can use vector_to_array
import org.apache.spark.ml.functions.vector_to_array

testDF.select(vector_to_array($"scaledFeatures").alias("_tmp")).select(exprs:_*)
One possible approach is something similar to this
import org.apache.spark.sql.functions.udf

// In Spark 1.x you'll will have to replace ML Vector with MLLib one
// import org.apache.spark.mllib.linalg.Vector
// In 2.x the below is usually the right choice
import org.apache.spark.ml.linalg.Vector

// Get size of the vector
val n = testDF.first.getAs[Vector](0).size

// Simple helper to convert vector to array<double> 
// asNondeterministic is available in Spark 2.3 or befor
// It can be removed, but at the cost of decreased performance
val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic

// Prepare a list of columns to create
val exprs = (0 until n).map(i => $"_tmp".getItem(i).alias(s"f$i"))

testDF.select(vecToSeq($"scaledFeatures").alias("_tmp")).select(exprs:_*)
If you know a list of columns upfront you can simplify this a little:
val cols: Seq[String] = ???
val exprs = cols.zipWithIndex.map{ case (c, i) => $"_tmp".getItem(i).alias(c) }

Source: link

Recent Questions on scala

    Programming Languages