关系经济人类预测化学自然
中准网
自然科学
知识物理
化学生物
地理解释
预测理解
本质社会
人类现象
行为研究
经济政治
心理结构
关系指导
人文遗产

FlinkML中机器学习算法介绍(一)

3月3日 终离去投稿
  FlinkML是ApacheFlink生态的子项目,提供机器学习(ML)API和基础设施,简化了ML管道的构建。用户可以使用标准的MLAPI实现ML算法,并进一步使用这些基础设施构建用于训练和推断作业的ML管道。为用户提供了标准的MLAPI,用户可以使用这些API实现ML算法,并进一步使用FlinkML提供的基础设施构建用于训练和推断作业的ML管道。可以帮助用户构建和部署机器学习模型,以便在实时数据流中进行预测和推断。
  FlinkML的算法库包含常用的机器学习算法:
  一、分类(Classification):属于监督学习的范畴,根据一些给定的已知类别的样本,使它能够对未知类别的样本进行分类,要求必须事先明确知道各个类别的信息。
  1、KNN:KNN是一种分类算法。KNN的基本假设是,如果所提供样本的大多数最近的K个邻居属于同一标签,则所提供样本也极有可能属于该标签。
  KNN的优点是:无需训练,计算时间快,算法简单易懂,适用于回归和分类,准确度高,不需要与更好的监督学习模型进行比较,不需要对数据进行额外的假设、调整多个参数或构建模型。
  KNN的缺点是:计算时间随着数据量的增加而增加,对于高维数据不太适用,对于分类不平衡的数据集表现不佳。
  应用场景包括:文本分类或文本挖掘、森林清查和估算森林变量、基因表达谱的功能基因组学研究、数据预处理。
  示例代码:importorg。apache。flink。ml。classification。knn。Kimportorg。apache。flink。ml。classification。knn。KnnMimportorg。apache。flink。ml。linalg。DenseVimportorg。apache。flink。ml。linalg。Vimportorg。apache。flink。streaming。api。datastream。DataSimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。table。api。Timportorg。apache。flink。table。api。bridge。java。StreamTableEimportorg。apache。flink。types。Rimportorg。apache。flink。util。CloseableISimpleprogramthattrainsaKnnmodelandusesitforclassification。publicclassKnnExample{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();StreamTableEnvironmenttEnvStreamTableEnvironment。create(env);Generatesinputtrainingandpredictiondata。DataStreamRowtrainStreamenv。fromElements(Row。of(Vectors。dense(2。0,3。0),1。0),Row。of(Vectors。dense(2。1,3。1),1。0),Row。of(Vectors。dense(200。1,300。1),2。0),Row。of(Vectors。dense(200。2,300。2),2。0),Row。of(Vectors。dense(200。3,300。3),2。0),Row。of(Vectors。dense(200。4,300。4),2。0),Row。of(Vectors。dense(200。4,300。4),2。0),Row。of(Vectors。dense(200。6,300。6),2。0),Row。of(Vectors。dense(2。1,3。1),1。0),Row。of(Vectors。dense(2。1,3。1),1。0),Row。of(Vectors。dense(2。1,3。1),1。0),Row。of(Vectors。dense(2。1,3。1),1。0),Row。of(Vectors。dense(2。3,3。2),1。0),Row。of(Vectors。dense(2。3,3。2),1。0),Row。of(Vectors。dense(2。8,3。2),3。0),Row。of(Vectors。dense(300。,3。2),4。0),Row。of(Vectors。dense(2。2,3。2),1。0),Row。of(Vectors。dense(2。4,3。2),5。0),Row。of(Vectors。dense(2。5,3。2),5。0),Row。of(Vectors。dense(2。5,3。2),5。0),Row。of(Vectors。dense(2。1,3。1),1。0));TabletrainTabletEnv。fromDataStream(trainStream)。as(features,label);DataStreamRowpredictStreamenv。fromElements(Row。of(Vectors。dense(4。0,4。1),5。0),Row。of(Vectors。dense(300,42),2。0));TablepredictTabletEnv。fromDataStream(predictStream)。as(features,label);CreatesaKnnobjectandinitializesitsparameters。KnnknnnewKnn()。setK(4);TrainstheKnnModel。KnnModelknnModelknn。fit(trainTable);UsestheKnnModelforpredictions。TableoutputTableknnModel。transform(predictTable)〔0〕;Extractsanddisplaystheresults。for(CloseableIteratorRowitoutputTable。execute()。collect();it。hasNext();){Rowrowit。next();DenseVectorfeatures(DenseVector)row。getField(knn。getFeaturesCol());doubleexpectedResult(Double)row。getField(knn。getLabelCol());doublepredictionResult(Double)row。getField(knn。getPredictionCol());System。out。printf(Features:15sExpectedResult:sPredictionResult:s,features,expectedResult,predictionResult);}}}
  2、LinearSVC(线性支持向量分类器)是一种算法,它试图找到一个超平面,以最大化分类样本之间的距离。
  线性支持向量分类器的优点是:在高维空间中有效,即使在维数大于样本数的情况下仍然有效,使用训练点的子集进行决策函数(称为支持向量),因此也具有内存效率。线性支持向量分类器的缺点是:它不能处理非线性数据。当数据集的样本数大于特征数时,使用LinearSVC会更快。此外,如果您的数据集非常大,则可以使用SGDClassifier或Nystroem转换器等核逼近方法来加速LinearSVC的训练。
  示例代码:importorg。apache。flink。ml。classification。linearsvc。LinearSVC;importorg。apache。flink。ml。classification。linearsvc。LinearSVCMimportorg。apache。flink。ml。linalg。DenseVimportorg。apache。flink。ml。linalg。Vimportorg。apache。flink。streaming。api。datastream。DataSimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。table。api。Timportorg。apache。flink。table。api。bridge。java。StreamTableEimportorg。apache。flink。types。Rimportorg。apache。flink。util。CloseableISimpleprogramthattrainsaLinearSVCmodelandusesitforclassification。publicclassLinearSVCExample{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();StreamTableEnvironmenttEnvStreamTableEnvironment。create(env);Generatesinputdata。DataStreamRowinputStreamenv。fromElements(Row。of(Vectors。dense(1,2,3,4),0。,1。),Row。of(Vectors。dense(2,2,3,4),0。,2。),Row。of(Vectors。dense(3,2,3,4),0。,3。),Row。of(Vectors。dense(4,2,3,4),0。,4。),Row。of(Vectors。dense(5,2,3,4),0。,5。),Row。of(Vectors。dense(11,2,3,4),1。,1。),Row。of(Vectors。dense(12,2,3,4),1。,2。),Row。of(Vectors。dense(13,2,3,4),1。,3。),Row。of(Vectors。dense(14,2,3,4),1。,4。),Row。of(Vectors。dense(15,2,3,4),1。,5。));TableinputTabletEnv。fromDataStream(inputStream)。as(features,label,weight);CreatesaLinearSVCobjectandinitializesitsparameters。LinearSVClinearSVCnewLinearSVC()。setWeightCol(weight);TrainstheLinearSVCModel。LinearSVCModellinearSVCModellinearSVC。fit(inputTable);UsestheLinearSVCModelforpredictions。TableoutputTablelinearSVCModel。transform(inputTable)〔0〕;Extractsanddisplaystheresults。for(CloseableIteratorRowitoutputTable。execute()。collect();it。hasNext();){Rowrowit。next();DenseVectorfeatures(DenseVector)row。getField(linearSVC。getFeaturesCol());doubleexpectedResult(Double)row。getField(linearSVC。getLabelCol());doublepredictionResult(Double)row。getField(linearSVC。getPredictionCol());DenseVectorrawPredictionResult(DenseVector)row。getField(linearSVC。getRawPredictionCol());System。out。printf(Features:25sExpectedResult:sPredictionResult:sRawPredictionResult:s,features,expectedResult,predictionResult,rawPredictionResult);}}}
  3、LogisticRegressionLogisticregression是广义线性模型的一种特殊情况,它是一种用于解决二分类问题的机器学习方法,用于估计某种事物的可能性,是一种基于概率的模式识别算法,虽然名字中带回归,但实际上是一种分类方法。在实际应用中,逻辑回归可以说是应用最广泛的机器学习算法之一。逻辑回归的目标是根据输入特征的线性组合来预测一个二元输出变量的概率。它使用sigmoid函数(S(x)1(1e(x)))将线性函数的输出转换为概率值,从而进行分类,逻辑回归可以用于二元分类和多元分类。
  LogisticRegression算法是一种广泛使用的算法,因为它非常高效,不需要太大的计算量,又通俗易懂,不需要缩放输入特征,不需要任何调整,且很容易调整,并且输出校准好的预测概率。但是,它也有一些缺点。例如,它不能用于解决非线性问题,因为Logistic的决策面是线性的;对多重共线性数据较为敏感;很难处理数据不平衡的问题;准确率并不是很高,因为形式非常的简单(非常类似线性模型),很难去拟合数据的真实分布。
  示例代码:
  普通示例importorg。apache。flink。ml。classification。logisticregression。LogisticRimportorg。apache。flink。ml。classification。logisticregression。LogisticRegressionMimportorg。apache。flink。ml。linalg。DenseVimportorg。apache。flink。ml。linalg。Vimportorg。apache。flink。streaming。api。datastream。DataSimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。table。api。Timportorg。apache。flink。table。api。bridge。java。StreamTableEimportorg。apache。flink。types。Rimportorg。apache。flink。util。CloseableISimpleprogramthattrainsaLogisticRegressionmodelandusesitforclassification。publicclassLogisticRegressionExample{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();StreamTableEnvironmenttEnvStreamTableEnvironment。create(env);Generatesinputdata。DataStreamRowinputStreamenv。fromElements(Row。of(Vectors。dense(1,2,3,4),0。,1。),Row。of(Vectors。dense(2,2,3,4),0。,2。),Row。of(Vectors。dense(3,2,3,4),0。,3。),Row。of(Vectors。dense(4,2,3,4),0。,4。),Row。of(Vectors。dense(5,2,3,4),0。,5。),Row。of(Vectors。dense(11,2,3,4),1。,1。),Row。of(Vectors。dense(12,2,3,4),1。,2。),Row。of(Vectors。dense(13,2,3,4),1。,3。),Row。of(Vectors。dense(14,2,3,4),1。,4。),Row。of(Vectors。dense(15,2,3,4),1。,5。));TableinputTabletEnv。fromDataStream(inputStream)。as(features,label,weight);CreatesaLogisticRegressionobjectandinitializesitsparameters。LogisticRegressionlrnewLogisticRegression()。setWeightCol(weight);TrainstheLogisticRegressionModel。LogisticRegressionModellrModellr。fit(inputTable);UsestheLogisticRegressionModelforpredictions。TableoutputTablelrModel。transform(inputTable)〔0〕;Extractsanddisplaystheresults。for(CloseableIteratorRowitoutputTable。execute()。collect();it。hasNext();){Rowrowit。next();DenseVectorfeatures(DenseVector)row。getField(lr。getFeaturesCol());doubleexpectedResult(Double)row。getField(lr。getLabelCol());doublepredictionResult(Double)row。getField(lr。getPredictionCol());DenseVectorrawPredictionResult(DenseVector)row。getField(lr。getRawPredictionCol());System。out。printf(Features:25sExpectedResult:sPredictionResult:sRawPredictionResult:s,features,expectedResult,predictionResult,rawPredictionResult);}}}
  在线无界流importorg。apache。flink。api。common。typeinfo。TypeIimportorg。apache。flink。api。common。typeinfo。Timportorg。apache。flink。api。java。typeutils。RowTypeIimportorg。apache。flink。ml。classification。logisticregression。OnlineLogisticRimportorg。apache。flink。ml。classification。logisticregression。OnlineLogisticRegressionMimportorg。apache。flink。ml。examples。util。PeriodicSourceFimportorg。apache。flink。ml。linalg。DenseVimportorg。apache。flink。ml。linalg。Vimportorg。apache。flink。ml。linalg。typeinfo。DenseVectorTypeIimportorg。apache。flink。streaming。api。datastream。DataSimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。streaming。api。functions。source。SourceFimportorg。apache。flink。table。api。Timportorg。apache。flink。table。api。bridge。java。StreamTableEimportorg。apache。flink。types。Rimportorg。apache。flink。util。CloseableIimportjava。util。Aimportjava。util。Cimportjava。util。LSimpleprogramthattrainsanOnlineLogisticRegressionmodelandusesitforclassification。publicclassOnlineLogisticRegressionExample{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。setParallelism(4);StreamTableEnvironmenttEnvStreamTableEnvironment。create(env);Generatesinputtrainingandpredictiondata。Bothareinfinitestreamsthatperiodicallysendsoutprovideddatatotriggermodelupdateandprediction。ListRowtrainData1Arrays。asList(Row。of(Vectors。dense(0。1,2。),0。),Row。of(Vectors。dense(0。2,2。),0。),Row。of(Vectors。dense(0。3,2。),0。),Row。of(Vectors。dense(0。4,2。),0。),Row。of(Vectors。dense(0。5,2。),0。),Row。of(Vectors。dense(11。,12。),1。),Row。of(Vectors。dense(12。,11。),1。),Row。of(Vectors。dense(13。,12。),1。),Row。of(Vectors。dense(14。,12。),1。),Row。of(Vectors。dense(15。,12。),1。));ListRowtrainData2Arrays。asList(Row。of(Vectors。dense(0。2,3。),0。),Row。of(Vectors。dense(0。8,1。),0。),Row。of(Vectors。dense(0。7,1。),0。),Row。of(Vectors。dense(0。6,2。),0。),Row。of(Vectors。dense(0。2,2。),0。),Row。of(Vectors。dense(14。,17。),1。),Row。of(Vectors。dense(15。,10。),1。),Row。of(Vectors。dense(16。,16。),1。),Row。of(Vectors。dense(17。,10。),1。),Row。of(Vectors。dense(18。,13。),1。));ListRowpredictDataArrays。asList(Row。of(Vectors。dense(0。8,2。7),0。0),Row。of(Vectors。dense(15。5,11。2),1。0));RowTypeInfotypeInfonewRowTypeInfo(newTypeInformation〔〕{DenseVectorTypeInfo。INSTANCE,Types。DOUBLE},newString〔〕{features,label});SourceFunctionRowtrainSourcenewPeriodicSourceFunction(1000,Arrays。asList(trainData1,trainData2));DataStreamRowtrainStreamenv。addSource(trainSource,typeInfo);TabletrainTabletEnv。fromDataStream(trainStream)。as(features);SourceFunctionRowpredictSourcenewPeriodicSourceFunction(1000,Collections。singletonList(predictData));DataStreamRowpredictStreamenv。addSource(predictSource,typeInfo);TablepredictTabletEnv。fromDataStream(predictStream)。as(features);CreatesanonlineLogisticRegressionobjectandinitializesitsparametersandinitialmodeldata。RowinitModelDataRow。of(Vectors。dense(0。41233679404769874,0。18088118293232122),0L);TableinitModelDataTabletEnv。fromDataStream(env。fromElements(initModelData));OnlineLogisticRegressionolrnewOnlineLogisticRegression()。setFeaturesCol(features)。setLabelCol(label)。setPredictionCol(prediction)。setReg(0。2)。setElasticNet(0。5)。setGlobalBatchSize(10)。setInitialModelData(initModelDataTable);TrainstheonlineLogisticRegressionModel。OnlineLogisticRegressionModelonlineModelolr。fit(trainTable);UsestheonlineLogisticRegressionModelforpredictions。TableoutputTableonlineModel。transform(predictTable)〔0〕;Extractsanddisplaystheresults。Astrainingdatastreamcontinuouslytriggerstheupdateoftheinternalmodeldata,rawpredictionresultsofthesamepredictdatasetwouldchangeovertime。for(CloseableIteratorRowitoutputTable。execute()。collect();it。hasNext();){Rowrowit。next();DenseVectorfeatures(DenseVector)row。getField(olr。getFeaturesCol());DoubleexpectedResult(Double)row。getField(olr。getLabelCol());DoublepredictionResult(Double)row。getField(olr。getPredictionCol());DenseVectorrawPredictionResult(DenseVector)row。getField(olr。getRawPredictionCol());System。out。printf(Features:25sExpectedResult:sPredictionResult:sRawPredictionResult:s,features,expectedResult,predictionResult,rawPredictionResult);}}}
  4、NaiveBayes朴素贝叶斯算法是一种基于贝叶斯定理和特征条件独立假设的分类方法。在许多场合,朴素贝叶斯分类算法可以与决策树和神经网络分类算法相媲美,该算法能运用到大型数据库中,而且方法简单、分类准确率高、速度快。
  朴素贝叶斯算法的优点包括:模型发源于古典数学理论,有稳定的分类效率。对小规模的数据表现很好,能处理多分类任务,适合增量式训练,尤其是数据量超出内存时,可以一批批的去增量训练。对缺失数据不太敏感,算法也比较简单,常用于文本分类。
  缺点包括:需要计算先验概率。分类决策存在错误率。对输入数据的表达形式很敏感。由于使用了样本属性独立性的假设,所以如果样本属性有关联时其效果不好。
  NaiveBayes应用场景比较广泛,文本分类垃圾文本过滤情感判别是应用最多的场景之一,朴素贝叶斯在文本分类场景中占据着一席之地。此外,朴素贝叶斯还可以应用于互斥群组中个体的区分,以及在估算决策论框架的矩阵中。importorg。apache。flink。ml。classification。naivebayes。NaiveBimportorg。apache。flink。ml。classification。naivebayes。NaiveBayesMimportorg。apache。flink。ml。linalg。DenseVimportorg。apache。flink。ml。linalg。Vimportorg。apache。flink。streaming。api。datastream。DataSimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。table。api。Timportorg。apache。flink。table。api。bridge。java。StreamTableEimportorg。apache。flink。types。Rimportorg。apache。flink。util。CloseableISimpleprogramthattrainsaNaiveBayesmodelandusesitforclassification。publicclassNaiveBayesExample{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();StreamTableEnvironmenttEnvStreamTableEnvironment。create(env);Generatesinputtrainingandpredictiondata。DataStreamRowtrainStreamenv。fromElements(Row。of(Vectors。dense(0,0。),11),Row。of(Vectors。dense(1,0),10),Row。of(Vectors。dense(1,1。),10));TabletrainTabletEnv。fromDataStream(trainStream)。as(features,label);DataStreamRowpredictStreamenv。fromElements(Row。of(Vectors。dense(0,1。)),Row。of(Vectors。dense(0,0。)),Row。of(Vectors。dense(1,0)),Row。of(Vectors。dense(1,1。)));TablepredictTabletEnv。fromDataStream(predictStream)。as(features);CreatesaNaiveBayesobjectandinitializesitsparameters。NaiveBayesnaiveBayesnewNaiveBayes()。setSmoothing(1。0)。setFeaturesCol(features)。setLabelCol(label)。setPredictionCol(prediction)。setModelType(multinomial);TrainstheNaiveBayesModel。NaiveBayesModelnaiveBayesModelnaiveBayes。fit(trainTable);UsestheNaiveBayesModelforpredictions。TableoutputTablenaiveBayesModel。transform(predictTable)〔0〕;Extractsanddisplaystheresults。for(CloseableIteratorRowitoutputTable。execute()。collect();it。hasNext();){Rowrowit。next();DenseVectorfeatures(DenseVector)row。getField(naiveBayes。getFeaturesCol());doublepredictionResult(Double)row。getField(naiveBayes。getPredictionCol());System。out。printf(Features:sPredictionResult:s,features,predictionResult);}}}
  二、回归
  有监督学习的两大应用之一,产生连续的结果。例如向模型输入人的各种数据的训练样本,产生输入一个人的数据,判断此人20年后今后的经济能力的结果,结果是连续的,往往得到一条回归曲线。当输入自变量不同时,输出的因变量非离散分布(不仅仅是一条线性直线,多项曲线也是回归曲线)。
  1、LinearRegression算法是一种常用的回归算法,它的目的是通过找到一条直线或者一个平面,来拟合数据集中的数据点,从而实现对连续型变量的预测。
  LinearRegression算法的基本思想是,假设数据集中的数据点之间存在一个线性关系,即:
  ywxb
  其中,y是因变量,x是自变量,w和b是待求的参数,分别表示斜率和截距。LinearRegression算法的任务就是通过给定的数据集,找到最合适的w和b,使得预测值y和真实值y之间的误差最小。
  LinearRegression算法可以分为两种类型,根据自变量x的个数不同:简单线性回归(SimpleLinearRegression):当x只有一个时,即只有一个特征或属性时,称为简单线性回归。这时,LinearRegression算法就是在二维平面上找到一条直线,来拟合数据点。多元线性回归(MultipleLinearRegression):当x有多个时,即有多个特征或属性时,称为多元线性回归。这时,LinearRegression算法就是在高维空间中找到一个平面或者一个超平面,来拟合数据点。
  步骤描述:输入:数据集D{(x1,y1),(x2,y2),,(xn,yn)},其中xi是自变量向量,yi是因变量标量输出:参数w和b步骤:计算预测值ywxb计算预测值y和真实值y之间的误差eyy计算误差e的平方和或者均方误差作为损失函数L(w,b)使用梯度下降法或者最小二乘法等优化方法,更新w和b的值,使得L(w,b)最小化初始化:随机给定w和b的初始值迭代:直到达到最大迭代次数或者收敛条件返回当前的w和b
  LinearRegression算法是一种简单而有效的回归算法,它可以用于预测房价、销量、收入等连续型变量。但是它也有一些缺点,比如:对于非线性关系的数据集效果不好,因为它假设数据点之间是线性相关的对于异常值或者噪声敏感,可能影响参数的估计和预测的准确性对于多重共线性的特征可能导致参数不稳定或者过拟合,需要使用正则化方法进行惩罚或者选择合适的特征子集
  示例:importorg。apache。flink。ml。linalg。DenseVimportorg。apache。flink。ml。linalg。Vimportorg。apache。flink。ml。regression。linearregression。LinearRimportorg。apache。flink。ml。regression。linearregression。LinearRegressionMimportorg。apache。flink。streaming。api。datastream。DataSimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。table。api。Timportorg。apache。flink。table。api。bridge。java。StreamTableEimportorg。apache。flink。types。Rimportorg。apache。flink。util。CloseableISimpleprogramthattrainsaLinearRegressionmodelandusesitforregression。publicclassLinearRegressionExample{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();StreamTableEnvironmenttEnvStreamTableEnvironment。create(env);Generatesinputdata。DataStreamRowinputStreamenv。fromElements(Row。of(Vectors。dense(2,1),4。0,1。0),Row。of(Vectors。dense(3,2),7。0,1。0),Row。of(Vectors。dense(4,3),10。0,1。0),Row。of(Vectors。dense(2,4),10。0,1。0),Row。of(Vectors。dense(2,2),6。0,1。0),Row。of(Vectors。dense(4,3),10。0,1。0),Row。of(Vectors。dense(1,2),5。0,1。0),Row。of(Vectors。dense(5,3),11。0,1。0));TableinputTabletEnv。fromDataStream(inputStream)。as(features,label,weight);CreatesaLinearRegressionobjectandinitializesitsparameters。LinearRegressionlrnewLinearRegression()。setWeightCol(weight);TrainstheLinearRegressionModel。LinearRegressionModellrModellr。fit(inputTable);UsestheLinearRegressionModelforpredictions。TableoutputTablelrModel。transform(inputTable)〔0〕;Extractsanddisplaystheresults。for(CloseableIteratorRowitoutputTable。execute()。collect();it。hasNext();){Rowrowit。next();DenseVectorfeatures(DenseVector)row。getField(lr。getFeaturesCol());doubleexpectedResult(Double)row。getField(lr。getLabelCol());doublepredictionResult(Double)row。getField(lr。getPredictionCol());System。out。printf(Features:sExpectedResult:sPredictionResult:s,features,expectedResult,predictionResult);}}}
  三、聚类
  属于无监督学习的范畴,根据样本间的某种距离或者相似性来定义聚类,即把相似的(或距离近的)样本聚为同一类,而把不相似的(或距离远的)样本归在其他类。
  1、Kmeans算法是一种迭代求解的聚类分析算法,它的目的是将数据集划分为K个不同的簇,使得每个簇内的数据点尽可能相似,而不同簇之间的数据点尽可能不同。
  KMeans算法的优点包括:算法简单,实现容易。对处理大数据集,该算法保持可伸缩性和高效性。当簇是密集的,且它们与其他簇是分离的,聚类效果较好。
  缺点包括:
  Kmeans算法是一种简单而有效的聚类方法,但是它也有一些局限性,比如:需要事先确定K值,但是在实际应用中,K值往往不容易确定。对于初始聚类中心的选择敏感,不同的初始聚类中心可能导致不同的聚类结果。对于噪声和异常值敏感,可能影响聚类质量。对于非凸形状或者大小差异较大的簇效果不好,因为它假设每个簇是球形或者椭球形的。
  Kmeans算法有很多应用场景,比如:
  文档分类:可以将相同话题的文档聚集在一起,并自动生成不同话题的专栏。
  用户分群:可以根据用户的行为、偏好、属性等特征,将用户划分为不同的群体,从而进行个性化的推荐、营销、服务等。
  图像分割:可以将图像中的像素点按照颜色或灰度进行聚类,从而实现图像的分割、压缩、增强等。
  异常检测:可以将数据中的异常点或噪声点划分为一个簇,从而进行过滤或处理。
  数据降维:可以将高维数据中的相似点聚集在一起,从而降低数据的维度,减少计算量和存储空间。
  importorg。apache。flink。ml。clustering。kmeans。KM
  importorg。apache。flink。ml。clustering。kmeans。KMeansM
  importorg。apache。flink。ml。linalg。DenseV
  importorg。apache。flink。ml。linalg。V
  importorg。apache。flink。streaming。api。datastream。DataS
  importorg。apache。flink。streaming。api。environment。StreamExecutionE
  importorg。apache。flink。table。api。T
  importorg。apache。flink。table。api。bridge。java。StreamTableE
  importorg。apache。flink。types。R
  importorg。apache。flink。util。CloseableI
  SimpleprogramthattrainsaKMeansmodelandusesitforclustering。
  publicclassKMeansExample{
  publicstaticvoidmain(String〔〕args){
  StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();
  StreamTableEnvironmenttEnvStreamTableEnvironment。create(env);
  Generatesinputdata。
  DataStreaminputStream
  env。fromElements(
  Vectors。dense(0。0,0。0),
  Vectors。dense(0。0,0。3),
  Vectors。dense(0。3,0。0),
  Vectors。dense(9。0,0。0),
  Vectors。dense(9。0,0。6),
  Vectors。dense(9。6,0。0));
  TableinputTabletEnv。fromDataStream(inputStream)。as(features);
  CreatesaKmeansobjectandinitializesitsparameters。
  KMeanskmeansnewKMeans()。setK(2)。setSeed(1L);
  TrainstheKmeansModel。
  KMeansModelkmeansModelkmeans。fit(inputTable);
  UsestheKmeansModelforpredictions。
  TableoutputTablekmeansModel。transform(inputTable)〔0〕;
  Extractsanddisplaystheresults。
  for(CloseableIteratoritoutputTable。execute()。collect();it。hasNext();){
  Rowrowit。next();
  DenseVectorfeatures(DenseVector)row。getField(kmeans。getFeaturesCol());
  intclusterId(Integer)row。getField(kmeans。getPredictionCol());
  System。out。printf(Features:sClusterID:s,features,clusterId);
  }
  }
  }
  2、AgglomerativeClustering
  AgglomerativeClustering算法是一种层次聚类算法,它的目的是将数据集划分为不同层次的簇,形成树状的聚类结构。AgglomerativeClustering算法是自底向上的,也就是说,它从每个数据点作为一个簇开始,然后逐步将最相近的两个簇合并为一个新的簇,直到达到预设的簇数或者满足某种停止条件。
  AgglomerativeClustering算法的关键在于如何计算簇之间的距离或相似度。根据不同的距离度量方法,AgglomerativeClustering算法可以分为以下三种类型:单链接(singlelinkage):簇之间的距离定义为簇内两个最近的数据点之间的距离。这种方法倾向于产生链状的簇。全链接(completelinkage):簇之间的距离定义为簇内两个最远的数据点之间的距离。这种方法倾向于产生紧凑的簇。平均链接(averagelinkage):簇之间的距离定义为簇内所有数据点两两之间的距离的平均值。这种方法倾向于产生平衡的簇。
  AgglomerativeClustering算法可以用以下步骤描述:输入:数据集D,簇数K或者停止条件输出:簇划分C步骤:找出距离最小的两个簇Ci和Cj将Ci和Cj合并为一个新的簇Ck更新距离矩阵,删除Ci和Cj对应的行和列,增加Ck对应的行和列初始化:将每个数据点作为一个簇,构建一个nn的距离矩阵,其中n是数据点的数量迭代:直到达到K个簇或者满足停止条件返回当前的簇划分C
  AgglomerativeClustering算法是一种常用的层次聚类算法,它可以展示数据集的层次结构,并且不需要事先指定簇数。但是它也有一些缺点,比如:计算复杂度较高,需要O(n3)的时间和O(n2)的空间对噪声和异常值敏感,可能影响聚类质量对不同形状或大小差异较大的簇效果不好,因为它假设每个簇是球形或者椭球形的
  示例代码:importorg。apache。flink。ml。clustering。agglomerativeclustering。AgglomerativeCimportorg。apache。flink。ml。clustering。agglomerativeclustering。AgglomerativeClusteringPimportorg。apache。flink。ml。common。distance。EuclideanDistanceMimportorg。apache。flink。ml。linalg。DenseVimportorg。apache。flink。ml。linalg。Vimportorg。apache。flink。streaming。api。datastream。DataSimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。table。api。Timportorg。apache。flink。table。api。bridge。java。StreamTableEimportorg。apache。flink。types。Rimportorg。apache。flink。util。CloseableISimpleprogramthatcreatesanAgglomerativeClusteringinstanceandusesitforclustering。publicclassAgglomerativeClusteringExample{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();StreamTableEnvironmenttEnvStreamTableEnvironment。create(env);Generatesinputdata。DataStreamDenseVectorinputStreamenv。fromElements(Vectors。dense(1,1),Vectors。dense(1,4),Vectors。dense(1,0),Vectors。dense(4,1。5),Vectors。dense(4,4),Vectors。dense(4,0));TableinputTabletEnv。fromDataStream(inputStream)。as(features);CreatesanAgglomerativeClusteringobjectandinitializesitsparameters。AgglomerativeClusteringagglomerativeClusteringnewAgglomerativeClustering()。setLinkage(AgglomerativeClusteringParams。LINKAGEWARD)。setDistanceMeasure(EuclideanDistanceMeasure。NAME)。setPredictionCol(prediction);UsestheAgglomerativeClusteringobjectforclustering。Table〔〕outputsagglomerativeClustering。transform(inputTable);Extractsanddisplaystheresults。for(CloseableIteratorRowitoutputs〔0〕。execute()。collect();it。hasNext();){Rowrowit。next();DenseVectorfeatures(DenseVector)row。getField(agglomerativeClustering。getFeaturesCol());intclusterId(Integer)row。getField(agglomerativeClustering。getPredictionCol());System。out。printf(Features:sClusterID:s,features,clusterId);}}}
  四、Evaluation评估
  机器学习的评估算法是用来评价机器学习模型的性能和效果的方法。
  1、BinaryClassificationEvaluator
  BinaryClassificationEvaluator是二值分类评估器。它可以用于评价二分类模型的性能和效果,例如准确率,精确率,召回率,F1值,AUC值等。
  BinaryClassificationEvaluator算法的基本思想是,根据模型的预测值和真实标签,计算出不同的评估指标,并根据指标的大小来判断模型的优劣。它可以用以下步骤描述:输入:数据集D{(x1,y1),(x2,y2),,(xn,yn)},其中xi是自变量向量,yi是因变量标量输出:评估指标步骤:初始化:创建一个BinaryClassificationEvaluator对象,并设置参数rawPredictionCol,labelCol,weightCol,metricName等计算:调用evaluate方法,传入数据集D,返回评估指标的值返回:输出评估指标的值
  示例:importorg。apache。flink。ml。evaluation。binaryclassification。BinaryClassificationEimportorg。apache。flink。ml。evaluation。binaryclassification。BinaryClassificationEvaluatorPimportorg。apache。flink。ml。linalg。Vimportorg。apache。flink。streaming。api。datastream。DataSimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。table。api。Timportorg。apache。flink。table。api。bridge。java。StreamTableEimportorg。apache。flink。types。RSimpleprogramthatcreatesaBinaryClassificationEvaluatorinstanceandusesitforevaluation。publicclassBinaryClassificationEvaluatorExample{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();StreamTableEnvironmenttEnvStreamTableEnvironment。create(env);Generatesinputdata。DataStreamRowinputStreamenv。fromElements(Row。of(1。0,Vectors。dense(0。1,0。9)),Row。of(1。0,Vectors。dense(0。2,0。8)),Row。of(1。0,Vectors。dense(0。3,0。7)),Row。of(0。0,Vectors。dense(0。25,0。75)),Row。of(0。0,Vectors。dense(0。4,0。6)),Row。of(1。0,Vectors。dense(0。35,0。65)),Row。of(1。0,Vectors。dense(0。45,0。55)),Row。of(0。0,Vectors。dense(0。6,0。4)),Row。of(0。0,Vectors。dense(0。7,0。3)),Row。of(1。0,Vectors。dense(0。65,0。35)),Row。of(0。0,Vectors。dense(0。8,0。2)),Row。of(1。0,Vectors。dense(0。9,0。1)));TableinputTabletEnv。fromDataStream(inputStream)。as(label,rawPrediction);CreatesaBinaryClassificationEvaluatorobjectandinitializesitsparameters。BinaryClassificationEvaluatorevaluatornewBinaryClassificationEvaluator()。setMetricsNames(BinaryClassificationEvaluatorParams。AREAUNDERPR,BinaryClassificationEvaluatorParams。KS,BinaryClassificationEvaluatorParams。AREAUNDERROC);UsestheBinaryClassificationEvaluatorobjectforevaluations。TableoutputTableevaluator。transform(inputTable)〔0〕;Extractsanddisplaystheresults。RowevaluationResultoutputTable。execute()。collect()。next();System。out。printf(Areaundertheprecisionrecallcurve:s,evaluationResult。getField(BinaryClassificationEvaluatorParams。AREAUNDERPR));System。out。printf(Areaunderthereceiveroperatingcharacteristiccurve:s,evaluationResult。getField(BinaryClassificationEvaluatorParams。AREAUNDERROC));System。out。printf(KolmogorovSmirnovvalue:s,evaluationResult。getField(BinaryClassificationEvaluatorParams。KS));}}
  五、Recommendation(推荐)
  1、Swing
  Swing算法是一种用于召回的算法,它是阿里早期使用的一种原创算法,在阿里多个业务场景被验证是非常有效的一种召回方式。它认为useritemuser的结构比itemCF的单边结构更稳定,更能反映物品之间的相似度。
  Swing算法的基本思想是,如果同时喜欢两个物品的用户越多,且这些用户之间的重合度越低,那么这两个物品之间的相似度越高。它通过计算用户对之间共同喜欢的物品数量的倒数来衡量物品之间的相似度。
  packageorg。apache。flink。ml。examples。importorg。apache。flink。ml。recommendation。swing。Simportorg。apache。flink。streaming。api。datastream。DataSimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。table。api。Timportorg。apache。flink。table。api。bridge。java。StreamTableEimportorg。apache。flink。types。Rimportorg。apache。flink。util。CloseableISimpleprogramthatcreatesaSwinginstanceandusesittogeneraterecommendationsforitems。publicclassSwingExample{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();StreamTableEnvironmenttEnvStreamTableEnvironment。create(env);Generatesinputdata。DataStreamRowinputStreamenv。fromElements(Row。of(0L,10L),Row。of(0L,11L),Row。of(0L,12L),Row。of(1L,13L),Row。of(1L,12L),Row。of(2L,10L),Row。of(2L,11L),Row。of(2L,12L),Row。of(3L,13L),Row。of(3L,12L));TableinputTabletEnv。fromDataStream(inputStream)。as(user,item);CreatesaSwingobjectandinitializesitsparameters。SwingswingnewSwing()。setUserCol(user)。setItemCol(item)。setMinUserBehavior(1);Transformsthedata。Table〔〕outputTableswing。transform(inputTable);Extractsanddisplaystheresultofswingalgorithm。for(CloseableIteratorRowitoutputTable〔0〕。execute()。collect();it。hasNext();){Rowrowit。next();longmainItemrow。getFieldAs(0);StringitemRankScorerow。getFieldAs(1);System。out。printf(item:d,topksimilaritems:s,mainItem,itemRankScore);}}}
投诉 评论 转载

五官乱飞是需要演技的,看了杨超越与刘美含,就明白差距了好演技与差演技的差别,往往体现在一些表演上的小细节。同样是面瘫脸,人设限制,也做不出过多的表情。《狂飙》里的倪大红与吴刚却成了两种评价,前者被赞气场强大。演出……魅族20明显是冲着小米13来的!屏幕边框更窄,仅为1。57m魅族20绝对是今年最沉得住气的,别人家的骁龙8Gen2旗舰早早就在去年发布了,甚至是不止一款,而魅族20系列却给我们吊了大半年的胃口。现在终于在2月底,官方开始了象征性的预热,……晚安故事丨45成就不一样的精彩45人生:开启另一种可能定下了计划,第二天早起背单词结果还是熬了夜不到ddl临近的那一周,绝不动工硬生生把效率逼成百分之两百卷又卷不动,躺又躺不平……深圳发展未来在于合并惠东县?深圳是我国最成功的经济特区,建立30年左右就成为一线城市,现在更是超越广州,排名全国第三!!!深圳也是很多人追求梦想的地方,很多内地的人才,苦无施展才华的平台。当他们来到……陈自瑶20年前Model出土!甜美包包获网友赞完胜炒车妆李佳两大四字头TVB花旦陈自瑶(Yoyo)和李佳芯(Ali)保养得宜,一向是不少网民心目中的女神。陈自瑶写真照然而近日李佳芯的陈年ShowGirl照被网上疯传,其灾难级……青海湖开湖,为何分文武?地处青藏高原东北部的青海湖,是我国最大的湖泊,也是最大的咸水湖。每年春季,其盛大的开湖景观备受关注。每年冬至前后,碧蓝的青海湖就会开始冻结,进入长达5个月的封冻期。到次年……全球首艘5G邮轮来了!国产首制大型邮轮将实现全船5G移动网络乘坐国产首制大型邮轮,游客可在WIFI6与手机5G网络之间随心选择。近日,中国船舶集团旗下中船嘉年华邮轮有限公司与中国电信股份有限公司上海分公司签订战略合作协议。根据协议,双方……干货孩子自控力差?这个实验结果引人深思为什么的孩子是否总是很容易被诱惑?看到小饼干就忍不住要吃,看到玩具就忍不住要买,看到游戏就忍不住要玩;有些孩子则能够很好地控制自己的冲动,比如能够为了更好吃的甜品而等待,……足球定了!亚足联官宣2023卡塔尔亚洲杯抽签日期北京时间4月5日,亚足联官方宣布,2023年亚足联卡塔尔亚洲杯的抽签日期、比赛日期、赛事球场等事宜已经确定。经亚足联和卡塔尔亚洲杯当地组委会确认,本届亚洲杯比赛将在202……蓉城相约拓疆山,外出招商二组调研海特集团四川港投集团2023年1月12日,由甘泉堡经开区党工委副书记、管委会副主任霍学锋带队,外出招商二组自四川广安至成都,前往海特集团、四川港投国际公路公司进行深度考察,具体情况如下:一、……FlinkML中机器学习算法介绍(一)FlinkML是ApacheFlink生态的子项目,提供机器学习(ML)API和基础设施,简化了ML管道的构建。用户可以使用标准的MLAPI实现ML算法,并进一步使用这些基础设……加密市场混乱中观光客撤退比特币流动性正在枯竭智通财经APP获悉,尽管比特币价格在今年以来出现了引人注目的飙升(涨幅超60),但无论以何种标准衡量,比特币的流动性都仍然很低。加密市场数据提供商KaikoResearc……
哈尔滨楼市的神话被终结,哈尔滨房价四连跌,哈尔滨楼市降温ProeCreoTWS蓝牙耳机产品设计说明书全新纯电动小型车五菱缤果申报图曝光李梦王思雨表现出色,中国女篮负美国队,虽败犹荣走读长沙老街镇丨玉潭老街流转光阴里烟火氤氲处房贷年限延长至40年?按揭人士称意义不大,利息已超过本金执教能力出众,却屡在大赛前下课,前南主帅理念先进为何被解雇?当陈冲和宋佳同穿绿色缎面裙时,终于明白女人精致点才能风韵犹存倍受青睐的水果之王,为何会被泰国所有的酒店拒之门外光明网评论员城市为什么需要夜经济贾跃亭,孙宏斌,许家印三王联手,可有未来?今年火了一种发型叫肩上一刀切,洋气减龄显脸小,明星也上头
寸头雕刻发型后脑藏惊喜三分准绝杀,同曦新秀狂轰324,上海队4连败,西热力江连续庆亚马逊,想给你家派个“机器人密探”!还要你花钱买热传聚热点网 郑州新冠肺炎疫情发布会:成立督导组对“表面解封”的小区进行调 个人名下有几张电话卡?一证通查业务上线酿酒的最大成本是时间酿酒的成本是多少一场考试男子野外发现一个铁盒打开后震撼了整个考古界大鱼和小鱼阅读答案南瓜四年级作文电脑辐射对皮肤伤害大吗冰岛旅行问题解答

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找菏泽德阳山西湖州宝鸡上海茂名内江三亚信阳长春北海西安安徽黄石烟台沧州湛江肇庆鹤壁六安韶关成都钦州