1 Spark ML architecture
Spark ML consists of classification, regression, clustering, collaborative filtering, dimensionality reduction, etc. Among them, a pipeline PipeLine can be constructed based on machine learning algorithms.
Spark builds a complete set of machine learning algorithm libraries through the machine learning pipeline based on the DataFrames high-level API.
2 feature vectorization
Only by converting the original features into feature vectors can it be used for machine learning model training. Common features mainly include the following:
- Numerical features: mainly for numeric types.
- Category characteristics: Category characteristics are exhaustive values. Categorical features cannot be used directly. Generally, the features need to be numbered and converted into numerical features.
- Text features: features extracted from text, such as movie reviews. Note that text features cannot be used directly, and word segmentation and encoding are required.
- Statistical features: Advanced features obtained from the original data using statistical methods. Commonly used statistical features include: average, median, sum, maximum, and minimum.
3 Data types of Spark ML
3.1 Local Vector
First create a vector (1.0, 0.0, 3.0), using 3 methods:
import org.apache.spark.mllib.linalg.{Vector, Vectors}
##
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
## (0,2) (1,3)
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
## 0,1 2,3.0
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
3.2 LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
##
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
println(pos.features)
println(pos.label)
##
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
##libSVM
Label 1:value 2:value
-15 1:0.708 3:-0.3333 -15 -> (1,0,3)
2 0
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
examples.foreach(println)
3.3 Local matrix
import org.apache.spark.mllib.linalg.{Matrix, Matrices}
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
val sm: Matrix =Matrices.sparse(4,4,Array(0,1,5,6,7),Array(0,0,1,2,3,3,3),Array(9,6,8,5,1,7,10))
4 Distributed matrix
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
4.1 Row-oriented distributed matrix
val v0=Vectors.dense(1.0,0.0,3.0)
val v1=Vectors.dense(3.0,2.0,4.0)
val rows =sc.parallelize(Seq(v0,v1))
val rows: RDD[Vector] = ...//an RDD of local vectors
//Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
val m = mat.numRows()
val n = mat.numCols()
mat.rows.foreach(println)
computeColumnSummaryStatistics()
scala> val summary = mat.computeColumnSummaryStatistics()
summary: org.apache.spark.mllib.stat.MultivariateStatisticalSummary = org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@4ecc4b43
summary.max
res5: org.apache.spark.mllib.linalg.Vector = [3.0,2.0,4.0]
summary
scala> summary.count
res1: Long = 2
scala> summary.variance
res2: org.apache.spark.mllib.linalg.Vector = [2.0,2.0,0.5]
scala> summary.mean
res3: org.apache.spark.mllib.linalg.Vector = [2.0,1.0,3.5]
L1
scala> summary.normL1
res4: org.apache.spark.mllib.linalg.Vector = [4.0,2.0,7.0]
4.2 Row Index Matrix (IndexedRowMatrix)
Indexed row matrix IndexedRowMatrix is similar to RowMatrix, but each row of it has a meaningful row index value. This index value can be used to identify different rows or perform operations such as join. The data is stored in an RDD composed of IndexedRow, that is, each row is a local vector with a long integer index.
Similar to RowMatrix, an instance of IndexedRowMatrix can be created by an instance of RDD[IndexedRow]
val dv1=Vectors.dense(1.0,0.0,3.0)
val dv2=Vectors.dense(3.0,2.0,4.0)
dv1 dv2 IndexedRow
scala> val idxr1 = IndexedRow(1,dv1)
idxr1: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(1,[1.0,2.0,3.0])
IndexedRow RDD[IndexedRow]
scala> val idxr2 = IndexedRow(2,dv2)
idxr2: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(2,[2.0,3.0,4.0])
scala> val idxrows = sc.parallelize(Array(idxr1,idxr2))
idxrows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.IndexedRow] = ParallelCollectionRDD[3] at parallelize at <console>:35
RDD[IndexedRow]
scala> val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)
idxmat: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@7951a08c
scala> idxmat.rows.foreach(println)
IndexedRow(1,[1.0,2.0,3.0])
IndexedRow(2,[2.0,3.0,4.0])
idxmat.computeColumnSummaryStatistics()
4.3 Coordinate Matrix
The coordinate matrix CoordinateMatrix is a distributed matrix based on an RDD composed of matrix items. Each matrix entry MatrixEntry is a triple: (i: Long, j: Long, value: Double), where i is the row index, j is the column index, and value is the value at that position. The coordinate matrix is generally used when the two dimensions of the matrix are large and the matrix is very sparse. CoordinateMatrix instances can be created by RDD[MatrixEntry] instances, where each matrix item is a triple of (rowIndex, colIndex, elem):
scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
ent1 ent2
scala> val ent1 = new MatrixEntry(0,1,0.5)
ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,1,0.5)
ent1 ent2
scala> val ent2 = new MatrixEntry(2,2,1.8)
ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.8)
RDD[MatrixEntry]
scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2))
entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[4] at parallelize at <console>:36
scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@8862881
scala> coordMat.entries.foreach(println)
MatrixEntry(0,1,0.5)
MatrixEntry(2,2,1.8)
coordMat
scala> val transMat: CoordinateMatrix = coordMat.transpose()
transMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@6b945576
coordMat
scala> transMat.entries.foreach(println)
MatrixEntry(1,0,0.5)
MatrixEntry(2,2,1.8)
val indexedRowMatrix = transMat.toIndexedRowMatrix()
indexedRowMatrix.rows.foreach(println)
IndexedRow(1,(3,[0],[0.5]))
IndexedRow(2,(3,[2],[1.8]))
4.4 Block Matrix
The block matrix is a distributed matrix of RDD based on the matrix block MatrixBlock, where each matrix block MatrixBlock is a tuple ((Int, Int), Matrix), where (Int, Int) is the index of the block, and Matrix It is the sub-matrix at the corresponding position, the size of which is determined by rowsPerBlock and colsPerBlock, and the default value is 1024. The block matrix supports addition and multiplication operations with another block matrix, and provides a support method validate() to confirm whether the block matrix is created successfully.
The block matrix can be converted by the indexed row matrix IndexedRowMatrix or the coordinate matrix CoordinateMatrix by calling the toBlockMatrix() method, which divides the matrix into blocks with a default size of 1024 1024, which can be passed in when calling the toBlockMatrix(rowsPerBlock, colsPerBlock) method Parameter to adjust the size of the block.
scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
scala> import org.apache.spark.mllib.linalg.distributed.BlockMatrix
import org.apache.spark.mllib.linalg.distributed.BlockMatrix
scala> val ent1 = new MatrixEntry(0,0,1)
ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,0,1.0)
scala> val ent2 = new MatrixEntry(1,1,1)
ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(1,1,1.0)
scala> val ent3 = new MatrixEntry(2,0,-1)
ent3: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,0,-1.0)
scala> val ent4 = new MatrixEntry(2,1,2)
ent4: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,1,2.0)
scala> val ent5 = new MatrixEntry(2,2,1)
ent5: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.0)
scala> val ent6 = new MatrixEntry(3,0,1)
ent6: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,0,1.0)
scala> val ent7 = new MatrixEntry(3,1,1)
ent7: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,1,1.0)
scala> val ent8 = new MatrixEntry(3,3,1)
ent8: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,3,1.0)
RDD[MatrixEntry]
scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2,ent3,ent4,ent5,ent6,ent7,ent8))
entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[10] at parallelize at <console>:51
RDD[MatrixEntry]
scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@7bb93e18
2x2
scala> val matA: BlockMatrix = coordMat.toBlockMatrix(2,2).cache()
matA: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@5dbbfa05
validate()
scala> matA.validate()
scala> matA.toLocalMatrix
res12: org.apache.spark.mllib.linalg.Matrix =
1.0 0.0 0.0 0.0
0.0 1.0 0.0 0.0
-1.0 2.0 1.0 0.0
1.0 1.0 0.0 1.0
//
scala> matA.numColBlocks
res13: Int = 2
scala> matA.numRowBlocks
res14: Int = 2
scala> val ata = matA.multiply(matA)
ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@5722c149
scala> ata.toLocalMatrix
2018-11-16 23:30:59 WARN BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2018-11-16 23:30:59 WARN BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
res15: org.apache.spark.mllib.linalg.Matrix =
1.0 0.0 0.0 0.0
0.0 1.0 0.0 0.0
-2.0 4.0 1.0 0.0
2.0 2.0 0.0 1.0
// A
scala> val ata = matA.transpose.multiply(matA)
ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@3644e451
scala> ata.toLocalMatrix
res1: org.apache.spark.mllib.linalg.Matrix =
3.0 -1.0 -1.0 1.0
-1.0 6.0 2.0 1.0
-1.0 2.0 1.0 0.0
1.0 1.0 0.0 1.0
https://blog.csdn.net/legotime/article/details/51089644
5 Summary of basic statistical algorithms
scala> import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vector
scala> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
scala> val data =sc.parallelize(1 to 100,2)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:37
scala> val data1 =data.map(x=>Vectors.dense(x))
data1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[29] at map at <console>:38
scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(data1)
summary: org.apache.spark.mllib.stat.MultivariateStatisticalSummary =
org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@35a2c933
scala> summary.count
res24: Long = 100
scala> summary.max
res25: org.apache.spark.mllib.linalg.Vector = [100.0]
scala> summary.mean
res26: org.apache.spark.mllib.linalg.Vector = [50.5]
scala> summary.normL1
res27: org.apache.spark.mllib.linalg.Vector = [5050.0]
scala> summary.normL2
res28: org.apache.spark.mllib.linalg.Vector = [581.6786054171153]
scala> summary.numNonzeros
res29: org.apache.spark.mllib.linalg.Vector = [100.0]
scala> summary.variance
res30: org.apache.spark.mllib.linalg.Vector = [841.6666666666666]
6 Correlation statistics
An example is as follows: For example, to analyze height and weight, we will ask a question: The higher the height, the heavier the weight? The problem is broken down into two directions: 1. The higher the height, the heavier or the lighter the weight. 2. For every 1 increase in height, how much does the weight increase or decrease. These are the two important elements of correlation: the direction of the correlation and the strength of the correlation. The direction of correlation is well understood, whether it is positive correlation, negative correlation or irrelevance. For question 2, there are different people who have different ideas to define the strength of relevance
val x =sc.parallelize(Array(1,2,3,4,5))
val y =sc.parallelize(Array(1,2,3,4,5))
scala> val correlation: Double = Statistics.corr(x,y, "pearson")
correlation: Double = 0.9999999999999998
scala> val correlation: Double = Statistics.corr(x,y, "spearman")
correlation: Double = 0.9999999999999998
summary
In order to write this article, I specially upgraded the big data platform to adapt to the follow-up PiepeLine and parameter grid. It is really hard work. Thank you for cherishing this article.
Qin Kaixin in Shenzhen 0:15