Detailed explanation of Spark ML numerical types and basic statistical algorithms for data aggregation-Spark commercial ML actual combat

Detailed explanation of Spark ML numerical types and basic statistical algorithms for data aggregation-Spark commercial ML actual combat

1 Spark ML architecture

Spark ML consists of classification, regression, clustering, collaborative filtering, dimensionality reduction, etc. Among them, a pipeline PipeLine can be constructed based on machine learning algorithms.

Spark builds a complete set of machine learning algorithm libraries through the machine learning pipeline based on the DataFrames high-level API.

2 feature vectorization

Only by converting the original features into feature vectors can it be used for machine learning model training. Common features mainly include the following:

  • Numerical features: mainly for numeric types.
  • Category characteristics: Category characteristics are exhaustive values. Categorical features cannot be used directly. Generally, the features need to be numbered and converted into numerical features.
  • Text features: features extracted from text, such as movie reviews. Note that text features cannot be used directly, and word segmentation and encoding are required.
  • Statistical features: Advanced features obtained from the original data using statistical methods. Commonly used statistical features include: average, median, sum, maximum, and minimum.

3 Data types of Spark ML

3.1 Local Vector

First create a vector (1.0, 0.0, 3.0), using 3 methods:

    import org.apache.spark.mllib.linalg.{Vector, Vectors}
    val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
    ##  (0,2) (1,3) 
    val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
    ##  0,1 2,3.0 
    val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

3.2 LabeledPoint

    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.regression.LabeledPoint
    val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
    val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

    Label    1:value    2:value
    -15 1:0.708 3:-0.3333   -15 ->  (1,0,3)  
     2 0 
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.util.MLUtils
    import org.apache.spark.rdd.RDD
    val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

3.3 Local matrix

    import org.apache.spark.mllib.linalg.{Matrix, Matrices}
    val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
    val sm: Matrix =Matrices.sparse(4,4,Array(0,1,5,6,7),Array(0,0,1,2,3,3,3),Array(9,6,8,5,1,7,10))

4 Distributed matrix

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

4.1 Row-oriented distributed matrix

val v0=Vectors.dense(1.0,0.0,3.0)
val v1=Vectors.dense(3.0,2.0,4.0)

val rows =sc.parallelize(Seq(v0,v1))
val rows: RDD[Vector] = ...//an RDD of local vectors
//Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
val m = mat.numRows()
val n = mat.numCols()

scala> val summary = mat.computeColumnSummaryStatistics()

summary: org.apache.spark.mllib.stat.MultivariateStatisticalSummary = org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@4ecc4b43

res5: org.apache.spark.mllib.linalg.Vector = [3.0,2.0,4.0]

scala> summary.count
res1: Long = 2

scala> summary.variance
res2: org.apache.spark.mllib.linalg.Vector = [2.0,2.0,0.5]

scala> summary.mean
res3: org.apache.spark.mllib.linalg.Vector = [2.0,1.0,3.5]

scala> summary.normL1
res4: org.apache.spark.mllib.linalg.Vector = [4.0,2.0,7.0]

4.2 Row Index Matrix (IndexedRowMatrix)

Indexed row matrix IndexedRowMatrix is similar to RowMatrix, but each row of it has a meaningful row index value. This index value can be used to identify different rows or perform operations such as join. The data is stored in an RDD composed of IndexedRow, that is, each row is a local vector with a long integer index.

Similar to RowMatrix, an instance of IndexedRowMatrix can be created by an instance of RDD[IndexedRow]

val dv1=Vectors.dense(1.0,0.0,3.0)
val dv2=Vectors.dense(3.0,2.0,4.0)

 dv1 dv2 IndexedRow
scala> val idxr1 = IndexedRow(1,dv1)
idxr1: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(1,[1.0,2.0,3.0])

 IndexedRow RDD[IndexedRow]
scala> val idxr2 = IndexedRow(2,dv2)
idxr2: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(2,[2.0,3.0,4.0])

scala> val idxrows = sc.parallelize(Array(idxr1,idxr2))
idxrows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.IndexedRow] = ParallelCollectionRDD[3] at parallelize at <console>:35

scala> val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)
idxmat: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@7951a08c

scala> idxmat.rows.foreach(println)


4.3 Coordinate Matrix

The coordinate matrix CoordinateMatrix is a distributed matrix based on an RDD composed of matrix items. Each matrix entry MatrixEntry is a triple: (i: Long, j: Long, value: Double), where i is the row index, j is the column index, and value is the value at that position. The coordinate matrix is generally used when the two dimensions of the matrix are large and the matrix is very sparse. CoordinateMatrix instances can be created by RDD[MatrixEntry] instances, where each matrix item is a triple of (rowIndex, colIndex, elem):

    scala>  import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
     ent1 ent2 
    scala> val ent1 = new MatrixEntry(0,1,0.5)
    ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,1,0.5)
     ent1 ent2 
    scala> val ent2 = new MatrixEntry(2,2,1.8)
    ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.8)
    scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2))
    entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[4] at parallelize at <console>:36
    scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
    coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@8862881
    scala> coordMat.entries.foreach(println)
    scala> val transMat: CoordinateMatrix = coordMat.transpose()
    transMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@6b945576
    scala>  transMat.entries.foreach(println)

    val indexedRowMatrix = transMat.toIndexedRowMatrix()

4.4 Block Matrix

The block matrix is a distributed matrix of RDD based on the matrix block MatrixBlock, where each matrix block MatrixBlock is a tuple ((Int, Int), Matrix), where (Int, Int) is the index of the block, and Matrix It is the sub-matrix at the corresponding position, the size of which is determined by rowsPerBlock and colsPerBlock, and the default value is 1024. The block matrix supports addition and multiplication operations with another block matrix, and provides a support method validate() to confirm whether the block matrix is created successfully.

The block matrix can be converted by the indexed row matrix IndexedRowMatrix or the coordinate matrix CoordinateMatrix by calling the toBlockMatrix() method, which divides the matrix into blocks with a default size of 1024 1024, which can be passed in when calling the toBlockMatrix(rowsPerBlock, colsPerBlock) method Parameter to adjust the size of the block.

    scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
    scala>  import org.apache.spark.mllib.linalg.distributed.BlockMatrix
    import org.apache.spark.mllib.linalg.distributed.BlockMatrix
    scala> val ent1 = new MatrixEntry(0,0,1)
    ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,0,1.0)
    scala> val ent2 = new MatrixEntry(1,1,1)
    ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(1,1,1.0)
    scala> val ent3 = new MatrixEntry(2,0,-1)
    ent3: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,0,-1.0)
    scala> val ent4 = new MatrixEntry(2,1,2)
    ent4: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,1,2.0)
    scala> val ent5 = new MatrixEntry(2,2,1)
    ent5: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.0)
    scala> val ent6 = new MatrixEntry(3,0,1)
    ent6: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,0,1.0)
    scala> val ent7 = new MatrixEntry(3,1,1)
    ent7: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,1,1.0)
    scala>  val ent8 = new MatrixEntry(3,3,1)
    ent8: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,3,1.0)
    scala>  val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2,ent3,ent4,ent5,ent6,ent7,ent8))
    entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[10] at parallelize at <console>:51
    scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
    coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@7bb93e18
    scala> val matA: BlockMatrix = coordMat.toBlockMatrix(2,2).cache()
    matA: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@5dbbfa05
    scala> matA.validate()
    scala> matA.toLocalMatrix
    res12: org.apache.spark.mllib.linalg.Matrix =
    1.0   0.0  0.0  0.0
    0.0   1.0  0.0  0.0
    -1.0  2.0  1.0  0.0
    1.0   1.0  0.0  1.0
    scala>  matA.numColBlocks
    res13: Int = 2
    scala> matA.numRowBlocks
    res14: Int = 2
    scala> val ata = matA.multiply(matA)
    ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@5722c149
    scala> ata.toLocalMatrix
    2018-11-16 23:30:59 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
    2018-11-16 23:30:59 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
    res15: org.apache.spark.mllib.linalg.Matrix =
    1.0   0.0  0.0  0.0
    0.0   1.0  0.0  0.0
    -2.0  4.0  1.0  0.0
    2.0   2.0  0.0  1.0 
   // A 
    scala> val ata = matA.transpose.multiply(matA)
    ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@3644e451
    scala> ata.toLocalMatrix
    res1: org.apache.spark.mllib.linalg.Matrix =
    3.0   -1.0  -1.0  1.0
    -1.0  6.0   2.0   1.0
    -1.0  2.0   1.0   0.0
    1.0   1.0   0.0   1.0

5 Summary of basic statistical algorithms

scala> import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vector

scala> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}

scala> val data =sc.parallelize(1 to 100,2)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:37

scala> val data1>Vectors.dense(x))
data1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[29] at map at <console>:38

scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(data1)
summary: org.apache.spark.mllib.stat.MultivariateStatisticalSummary = 	   

scala> summary.count
res24: Long = 100

scala> summary.max
res25: org.apache.spark.mllib.linalg.Vector = [100.0]

scala> summary.mean
res26: org.apache.spark.mllib.linalg.Vector = [50.5]

scala> summary.normL1
res27: org.apache.spark.mllib.linalg.Vector = [5050.0]

scala> summary.normL2
res28: org.apache.spark.mllib.linalg.Vector = [581.6786054171153]

scala> summary.numNonzeros
res29: org.apache.spark.mllib.linalg.Vector = [100.0]

scala> summary.variance
res30: org.apache.spark.mllib.linalg.Vector = [841.6666666666666]

6 Correlation statistics

An example is as follows: For example, to analyze height and weight, we will ask a question: The higher the height, the heavier the weight? The problem is broken down into two directions: 1. The higher the height, the heavier or the lighter the weight. 2. For every 1 increase in height, how much does the weight increase or decrease. These are the two important elements of correlation: the direction of the correlation and the strength of the correlation. The direction of correlation is well understood, whether it is positive correlation, negative correlation or irrelevance. For question 2, there are different people who have different ideas to define the strength of relevance

val x =sc.parallelize(Array(1,2,3,4,5))
val y =sc.parallelize(Array(1,2,3,4,5))

scala> val correlation: Double = Statistics.corr(x,y, "pearson")
correlation: Double = 0.9999999999999998

scala> val correlation: Double = Statistics.corr(x,y, "spearman")
correlation: Double = 0.9999999999999998


In order to write this article, I specially upgraded the big data platform to adapt to the follow-up PiepeLine and parameter grid. It is really hard work. Thank you for cherishing this article.

Qin Kaixin in Shenzhen 0:15