What is the importance of Resilient Distributed Datasets in Spark?
Resilient Distributed Datasets are the primary data structure of Apache Spark.
Rooted in Spark Core, RDDs are changeless, fault-tolerant, dispersed groups of
objects that can run in parallel. RDD's are divided into partitions and can operate
on diverse nodes of a cluster.
RDDs are made by either alteration of current RDDs or by loading an outside
dataset
from stable storage.
Here is the architecture of RDD:
What is a lazy evaluation in Spark?
When Spark works on any dataset, it retains the directions. If a transformation
like
a map() is called on an RDD, the procedure is not performed immediately.
Transformations in Spark are not evaluated till you execute an action, which helps
in optimizing the overall data processing workflow, recognized as lazy evaluation.
How do we monitor Apache Spark with Prometheus?
There are few ways to monitoring Apache Spark with Prometheus.
One of the ways is by JmxSink + JMX-exporter
Preparations
- Uncomment *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink in
spark/conf/metrics.properties
- Download jmx-exporter by the following link on prometheus/jmx_exporter
- Download Example prometheus config file
Use it in spark-shell or spark-submit
In the following command, the jmx_prometheus_javaagent-0.3.1.jar file and the
spark.yml are downloaded in previous steps. It might need to be changed
accordingly.
bin/spark-shell --conf
"spark.driver.extraJavaOptions=-javaagent:jmx_prometheus_javaagent-0.3.1.jar=8080:spar
k.yml"
Access it
After running, we can access with localhost:8080/metrics
Next
It can then configure Prometheus to scrape the metrics from JMX-exporter.
NOTE: We have to handle to discovery part properly if it's running in a cluster
environment.
How can you trigger automatic clean-ups in Spark to handle accumulated
metadata?
To trigger the clean-ups, you have to insert the parameter spark.cleaner.ttlx.
What are the advantages of Spark over MapReduce?
Spark has the given advantages over MapReduce:
- Because of the availability of in-memory processing, Spark executes the
processing
about 10 to 100 times quicker than Hadoop MapReduce, while MapReduce uses
persistence storage for all of the data processing jobs.
- Unlike Hadoop, Spark gives inbuilt libraries to execute various tasks from
the same
core like batch processing, Steaming, ML, Interactive SQL queries. Nevertheless,
Hadoop particularly supports batch processing.
- Hadoop is extremely disk-dependent, while Spark encourages caching and
in-memory
data storage.
- Spark is able in performing computations various times on the same dataset.
This is
called iterative computation, though there is no iterative computing performed by
Hadoop.
How to create an RDD that can represent a matrix in Apache Spark? How to multiply
two such RDDs?
It all depends on the input data and dimensions, but generally speaking, what we
want
is not an RDD but one of the distributed data structures from
org.apache.spark.mllib.linalg.distributed. At this moment it
provides four different implementations of the
DistributedMatrix:
- IndexedRowMatrix - can be created directly from
a RDD [IndexedRow] where IndexedRow consists of row index and
org.apache.spark.mllib.linalg.Vector.
import org.apache.spark.mllib.linalg.{Vectors, Matrices}
import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix,
IndexedRow}
val rows = sc.parallelize(Seq(
(0L, Array(1.0, 0.0, 0.0)),
(0L, Array(0.0, 1.0, 0.0)),
(0L, Array(0.0, 0.0, 1.0)))
).map{case (i, xs) => IndexedRow(i, Vectors.dense(xs))}
val indexedRowMatrix = new IndexedRowMatrix(rows)
- RowMatrix - similar to IndexedRowMatrix but without
meaningful row indices. Can be created directly from
RDD[org.apache.spark.mllib.linalg.Vector].
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rowMatrix = new RowMatrix(rows.map(_.vector))
- BlockMatrix - can be created from RDD[((Int, Int),
Matrix)] where the first element of the tuple contains coordinates of the block and the
second one is a local org.apache.spark.mllib.linalg.Matrix
val eye = Matrices.sparse(
3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1, 1, 1))
val blocks = sc.parallelize(Seq(
((0, 0), eye), ((1, 1), eye), ((2, 2), eye)))
val blockMatrix = new BlockMatrix(blocks, 3, 3, 9, 9)
- CoordinateMatrix - can be created from
RDD[MatrixEntry] where the MatrixEntry consists of row, column and value.
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,
MatrixEntry}
val entries = sc.parallelize(Seq(
(0, 0, 3.0), (2, 0, -5.0), (3, 2, 1.0),
(4, 1, 6.0), (6, 2, 2.0), (8, 1, 4.0))
).map{case (i, j, v) => MatrixEntry(i, j, v)}
val coordinateMatrix = new CoordinateMatrix(entries, 9, 3)
First two implementations support multiplication by a local Matrix:
val localMatrix = Matrices.dense(3, 2, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
indexedRowMatrix.multiply(localMatrix).rows.collect
// Array(IndexedRow(0,[1.0,4.0]), IndexedRow(0,[2.0,5.0]),
// IndexedRow(0,[3.0,6.0]))
And the third one can be multiplied by another BlockMatrix as long as the number
of
columns per block in this matrix matches the number of rows per block of the other
matrix. CoordinateMatrix doesn't support multiplications but is pretty easy to
create and transform to other types of distributed matrices:
blockMatrix.multiply(coordinateMatrix.toBlockMatrix(3, 3))
Each type has its own strong and weak sides and there are some additional factors
to consider when you use sparse or dense elements (Vectors or block Matrices). Multiplying by a local matrix is usually preferable since it doesn't require
expensive shuffling.
You can find more details about each type in the
MLlib Data Types
guide.
What is YARN?
Similar to Hadoop, YARN is one of the key characteristics in Spark, giving a
central
and resource management platform to perform scalable operations over the cluster.
YARN is a distributed container manager, whereas Spark is a data processing medium.
Spark can operate on YARN, the same way Hadoop Map Reduce can operate on YARN.
Running Spark on YARN requires a binary distribution of Spark as created on YARN
support.
What is the distinction between an RDD and a DataFrame in Apache Spark? Can you
transform one to the other?
A data frame is a table, or a 2-D array-like structure, in which every column
holds
measures on one variable, and every row includes one case.
Therefore, a DataFrame has appended metadata due to its tabular model, which
enables
Spark to conduct optimizations on the concluded query.
An RDD is a Resilient Distributed Dataset that is a BlackBox of data that cannot
optimize as the processes that are performed corresponding to it are not restricted.
However, you can go from a DataFrame to an RDD via its rdd method, and you can go
from an RDD to a DataFrame (if the RDD is in a tabular format) via the toDF system.
In general, it is recommended to utilize a DataFrame where plausible due to the
built-in query optimization.
How do we concatenate two columns in an Apache Spark DataFrame? Is there any
function in Spark SQL which we can utilize?
With raw SQL you can use CONCAT:
In Python:
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
In Scala:
import sqlContext.implicits._
val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
df.registerTempTable("df")
sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
Since Spark 1.5.0 you can use concat function with DataFrame API:
In Python:
from pyspark.sql.functions import concat, col, lit
df.select(concat(col("k"), lit(" "), col("v")))
In Scala:
import org.apache.spark.sql.functions.{concat, lit}
df.select(concat($"k", lit(" "), $"v"))
There is additionally the concat_ws function which accepts a string separator as
the
first argument.
How would you compute the total count of unique words in Spark?
- Load the text file as RDD:
sc.textFile("hdfs://Hadoop/user/test_file.txt");
- Function that breaks each line into words:
def toWords(line):
return line.split();
- Run the toWords function on each element of RDD in Spark as flatMap
transformation:
words = line.flatMap(toWords);
- Convert each word into (key,value) pair:
def toTuple(word):
return (word, 1);
wordTuple = words.map(toTuple);
- Perform reduceByKey() action:
def sum(x, y):
return x+y:
counts = wordsTuple.reduceByKey(sum)
- Print:
counts.collect()
What is the role of accumulators in Spark?
Accumulators are variables utilized to aggregate data across the executors. This
data can be regarding the information or API analysis like how many records are
corrupted, or how many times a library API was called.
See Also
Spring Boot Interview Questions
Apache Camel Interview Questions
Drools Interview Questions
Java 8 Interview Questions
Enterprise Service Bus- ESB Interview Questions.
JBoss Fuse Interview Questions
Top ElasticSearch frequently asked interview questions