Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting Key Not Found Exception while Serializing to a mleap bundle #872

Open
subbula1979 opened this issue Mar 4, 2024 · 5 comments
Open

Comments

@subbula1979
Copy link

subbula1979 commented Mar 4, 2024

Hello Team,

When I am trying to serialize a Spark Pipeline Model to a Mleap bundle. I am getting the below exception

java.util.NoSuchElementException: key not found: org.apache.spark.ml.PipelineModel
at scala.collection.immutable.Map$Map2.apply(Map.scala:227)
at ml.combust.bundle.BundleRegistry.opForObj(BundleRegistry.scala:102)
at ml.combust.bundle.BundleWriter.$anonfun$save$1(BundleWriter.scala:28)
at scala.Option.getOrElse(Option.scala:189)
at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:28)
at ml.combust.bundle.BundleWriter.$anonfun$save$3(BundleWriter.scala:41)
at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252)
at scala.util.control.Exception$Catch.apply(Exception.scala:228)
at scala.util.control.Exception$Catch.either(Exception.scala:252)
at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26)
at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26)
at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
at resource.DeferredExtractableManagedResource.$anonfun$tried$1(AbstractManagedResource.scala:33)
at scala.util.Try$.apply(Try.scala:213)
at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:40)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at scala.util.Success.flatMap(Try.scala:251)
at scala.util.Try$WithFilter.flatMap(Try.scala:142)
at scala.util.Success.flatMap(Try.scala:251)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

mleap version - 0.23.1 spark - 3.4.0 xgboost - 1.7.6

Follow is the code to serialize the Spark Pipeline Model to a Mleap Bundle.

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.ml.{PipelineModel, Transformer}
// MLeap/Bundle.ML Serialization Libraries
import org.apache.spark.ml.bundle.SparkBundleContext
import ml.combust.mleap.spark.SparkSupport._
import scala.util.{Failure, Success, Try}
import java.net.URI

def saveModelAsMleapBundle(bucketName: String, minioPath: String, bestModel : PipelineModel, transformedDf: DataFrame): Try[Unit] = {
Try {
implicit val sbc: SparkBundleContext = SparkBundleContext().withDataset(transformedDf)
val localPath = "/tmp/modelu.zip"
val bundleURI = new URI(s"jar:file:$localPath")
val result = bestModel .writeBundle.save(bundleURI)(sbc).get
}
}

Following is the code used to create the model

val labelEncoderPipelineStage = new StringIndexer().setInputCols(labelEncoderInputCols).setOutputCols(labelEncoderOutputCols).setHandleInvalid("keep")

val assemblerPipelineStage = new VectorAssembler().setInputCols(allFeatureColumns)
.setOutputCol("features").setHandleInvalid("keep")

def get_param(): mutable.HashMap[String, Any] = {
val params = new mutable.HashMapString, Any
params += "objective" -> "multi:softprob"
params += "num_class" -> 7
params += "tree_method" -> "auto"
params += "num_workers" -> 3,
params += "num_early_stopping_rounds" -> 3,
params += "maximize_evaluation_metrics" -> false,
params += "verbosity" -> 3,
param += "missing" -> 0.0,
params += "eta" -> 0.2
params += "seed" -> 50
return params
}
// Create an XGBoost Classifier
val xgb = new XGBoostClassifier(get_param().toMap)
.setFeaturesCol("features")
.setLabelCol()

val xgbParamGrid = (new ParamGridBuilder()
.addGrid(xgb.missing, Array(0.0))
.addGrid(xgb.maxDepth, Array(16))
.addGrid(xgb.eta, Array(0.2))
.addGrid(xgb.gamma, Array(0))
.addGrid(xgb.subSample, Array(0.6, 0.65, 0.7))
.addGrid(xgb.numRound, Array(0.8, 0.9))
.addGrid(xgb.colsampleBytree, Array(1.0))
.addGrid(xgb.minChildWeight, Array(1.0))
.build())

val labelConverterPipelineStage = new IndexToString().setInputCol(predictioncolumn)
.setOutputCol(prediction_indextostringcolumn).setLabels(labelsArray)

val pipeline = new Pipeline().setStages(Array(labelEncoderPipelineStage, assemblerPipelineStage, xgb,labelConverterPipelineStage ))

val evaluator = new MultilabelClassificationEvaluator()
.setLabelCol(label_column)
.setPredictionCol("prediction")
.setMetricName("accuracy")

// Create the Cross Validation pipeline, using XGBoost as the estimator, the
// Binary Classification evaluator, and xgbParamGrid for hyperparameters
val cv = (new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(xgbParamGrid)
.setNumFolds(3))

// Create the model by fitting the training data
val cvModel = cv.fit(trainDF)

val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]

model is successfully created but while doing the serializing to mleap bundle, getting the above issue

Following the reference.conf entry in my src/resources/reference.conf
ml.combust.mleap.spark.xgboost.ops = [
"ml.dmlc.xgboost4j.scala.spark.mleap.XGBoostClassificationModelOp",
"ml.dmlc.xgboost4j.scala.spark.mleap.XGBoostRegressionModelOp"
]

ml.combust.mleap.spark.registry.default.ops += "ml.combust.mleap.spark.xgboost.ops"

ml.combust.mleap.spark.registry.builtin-ops = [
"org.apache.spark.ml.bundle.ops.classification.DecisionTreeClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.NaiveBayesClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.MultiLayerPerceptronClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.OneVsRestOp",
"org.apache.spark.ml.bundle.ops.classification.RandomForestClassifierOp",

"org.apache.spark.ml.bundle.ops.clustering.GaussianMixtureOp",
"org.apache.spark.ml.bundle.ops.clustering.KMeansOp",
"org.apache.spark.ml.bundle.ops.clustering.BisectingKMeansOp",
"org.apache.spark.ml.bundle.ops.clustering.LDAModelOp",

"org.apache.spark.ml.bundle.ops.feature.BinarizerOp",
"org.apache.spark.ml.bundle.ops.feature.BucketizerOp",
"org.apache.spark.ml.bundle.ops.feature.ChiSqSelectorOp",
"org.apache.spark.ml.bundle.ops.feature.CountVectorizerOp",
"org.apache.spark.ml.bundle.ops.feature.DCTOp",
"org.apache.spark.ml.bundle.ops.feature.ElementwiseProductOp",
"org.apache.spark.ml.bundle.ops.feature.HashingTermFrequencyOp",
"org.apache.spark.ml.bundle.ops.feature.IDFOp",
"org.apache.spark.ml.bundle.ops.feature.InteractionOp",
"org.apache.spark.ml.bundle.ops.feature.MaxAbsScalerOp",
"org.apache.spark.ml.bundle.ops.feature.MinMaxScalerOp",
"org.apache.spark.ml.bundle.ops.feature.NGramOp",
"org.apache.spark.ml.bundle.ops.feature.NormalizerOp",
"org.apache.spark.ml.bundle.ops.feature.PcaOp",
"org.apache.spark.ml.bundle.ops.feature.PolynomialExpansionOp",
"org.apache.spark.ml.bundle.ops.feature.ReverseStringIndexerOp",
"org.apache.spark.ml.bundle.ops.feature.StandardScalerOp",
"org.apache.spark.ml.bundle.ops.feature.StopWordsRemoverOp",
"org.apache.spark.ml.bundle.ops.feature.TokenizerOp",
"org.apache.spark.ml.bundle.ops.feature.VectorAssemblerOp",
"org.apache.spark.ml.bundle.ops.feature.VectorIndexerOp",
"org.apache.spark.ml.bundle.ops.feature.VectorSlicerOp",
"org.apache.spark.ml.bundle.ops.feature.WordToVectorOp",
"org.apache.spark.ml.bundle.ops.feature.RegexTokenizerOp",

"org.apache.spark.ml.bundle.ops.regression.AFTSurvivalRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.DecisionTreeRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.GBTRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.GeneralizedLinearRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.IsotonicRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.LinearRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.RandomForestRegressionOp",

"org.apache.spark.ml.bundle.ops.recommendation.ALSOp",

"org.apache.spark.ml.bundle.ops.tuning.CrossValidatorOp",
"org.apache.spark.ml.bundle.ops.tuning.TrainValidationSplitOp",

"org.apache.spark.ml.bundle.ops.feature.MinHashLSHOp",
"org.apache.spark.ml.bundle.ops.feature.BucketedRandomProjectionLSHOp",
"org.apache.spark.ml.bundle.ops.classification.GBTClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.LogisticRegressionOp",
"org.apache.spark.ml.classification.bundle.ops.LinearSVCOp",
"org.apache.spark.ml.bundle.ops.feature.FeatureHasherOp",
"org.apache.spark.ml.bundle.ops.feature.StringIndexerOp",
"org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp",

"org.apache.spark.ml.bundle.ops.PipelineOp"
]

ml.combust.mleap.spark.registry.default.ops += "ml.combust.mleap.spark.registry.builtin-ops"

ml.combust.mleap.spark.extension.ops = [
"org.apache.spark.ml.bundle.extension.ops.classification.OneVsRestOp",
"org.apache.spark.ml.bundle.extension.ops.classification.SupportVectorMachineOp",
"org.apache.spark.ml.bundle.extension.ops.feature.ImputerOp",
"org.apache.spark.ml.bundle.extension.ops.feature.MathBinaryOp",
"org.apache.spark.ml.bundle.extension.ops.feature.MathUnaryOp",
"org.apache.spark.ml.bundle.extension.ops.feature.MultinomialLabelerOp",
"org.apache.spark.ml.bundle.extension.ops.feature.WordLengthFilterOp",
"org.apache.spark.ml.bundle.extension.ops.feature.StringMapOp"
]

ml.combust.mleap.spark.registry.default.ops += "ml.combust.mleap.spark.extension.ops"

ml.combust.mleap.registry.builtin-ops = [
"ml.combust.mleap.bundle.ops.classification.DecisionTreeClassifierOp",
"ml.combust.mleap.bundle.ops.classification.GBTClassifierOp",
"ml.combust.mleap.bundle.ops.classification.LogisticRegressionOp",
"ml.combust.mleap.bundle.ops.classification.NaiveBayesClassifierOp",
"ml.combust.mleap.bundle.ops.classification.MultiLayerPerceptronClassifierOp",
"ml.combust.mleap.bundle.ops.classification.OneVsRestOp",
"ml.combust.mleap.bundle.ops.classification.RandomForestClassifierOp",
"ml.combust.mleap.bundle.ops.classification.SupportVectorMachineOp",
"ml.combust.mleap.bundle.ops.classification.LinearSVCOp",

"ml.combust.mleap.bundle.ops.clustering.GaussianMixtureOp",
"ml.combust.mleap.bundle.ops.clustering.KMeansOp",
"ml.combust.mleap.bundle.ops.clustering.BisectingKMeansOp",
"ml.combust.mleap.bundle.ops.clustering.LDAModelOp",

"ml.combust.mleap.bundle.ops.feature.BinarizerOp",
"ml.combust.mleap.bundle.ops.sklearn.BinarizerOp",
"ml.combust.mleap.bundle.ops.feature.BucketedRandomProjectionLSHOp",
"ml.combust.mleap.bundle.ops.feature.BucketizerOp",
"ml.combust.mleap.bundle.ops.feature.ChiSqSelectorOp",
"ml.combust.mleap.bundle.ops.feature.CoalesceOp",
"ml.combust.mleap.bundle.ops.feature.CountVectorizerOp",
"ml.combust.mleap.bundle.ops.feature.DCTOp",
"ml.combust.mleap.bundle.ops.feature.ElementwiseProductOp",
"ml.combust.mleap.bundle.ops.feature.FeatureHasherOp",
"ml.combust.mleap.bundle.ops.feature.HashingTermFrequencyOp",
"ml.combust.mleap.bundle.ops.feature.IDFOp",
"ml.combust.mleap.bundle.ops.feature.ImputerOp",
"ml.combust.mleap.bundle.ops.feature.InteractionOp",
"ml.combust.mleap.bundle.ops.feature.MapEntrySelectorOp",
"ml.combust.mleap.bundle.ops.feature.MathBinaryOp",
"ml.combust.mleap.bundle.ops.feature.MathUnaryOp",
"ml.combust.mleap.bundle.ops.feature.MaxAbsScalerOp",
"ml.combust.mleap.bundle.ops.feature.MinHashLSHOp",
"ml.combust.mleap.bundle.ops.feature.MinMaxScalerOp",
"ml.combust.mleap.bundle.ops.feature.MultinomialLabelerOp",
"ml.combust.mleap.bundle.ops.feature.NGramOp",
"ml.combust.mleap.bundle.ops.feature.NormalizerOp",
"ml.combust.mleap.bundle.ops.feature.OneHotEncoderOp",
"ml.combust.mleap.bundle.ops.feature.PcaOp",
"ml.combust.mleap.bundle.ops.feature.PolynomialExpansionOp",
"ml.combust.mleap.bundle.ops.sklearn.PolynomialFeaturesOp",
"ml.combust.mleap.bundle.ops.feature.ReverseStringIndexerOp",
"ml.combust.mleap.bundle.ops.feature.StandardScalerOp",
"ml.combust.mleap.bundle.ops.feature.StopWordsRemoverOp",
"ml.combust.mleap.bundle.ops.feature.StringIndexerOp",
"ml.combust.mleap.bundle.ops.feature.StringMapOp",
"ml.combust.mleap.bundle.ops.feature.TokenizerOp",
"ml.combust.mleap.bundle.ops.feature.VectorAssemblerOp",
"ml.combust.mleap.bundle.ops.feature.VectorIndexerOp",
"ml.combust.mleap.bundle.ops.feature.VectorSlicerOp",
"ml.combust.mleap.bundle.ops.feature.WordToVectorOp",
"ml.combust.mleap.bundle.ops.feature.RegexTokenizerOp",
"ml.combust.mleap.bundle.ops.feature.RegexIndexerOp",
"ml.combust.mleap.bundle.ops.feature.WordLengthFilterOp",

"ml.combust.mleap.bundle.ops.regression.AFTSurvivalRegressionOp",
"ml.combust.mleap.bundle.ops.regression.DecisionTreeRegressionOp",
"ml.combust.mleap.bundle.ops.regression.GBTRegressionOp",
"ml.combust.mleap.bundle.ops.regression.GeneralizedLinearRegressionOp",
"ml.combust.mleap.bundle.ops.regression.IsotonicRegressionOp",
"ml.combust.mleap.bundle.ops.regression.LinearRegressionOp",
"ml.combust.mleap.bundle.ops.regression.RandomForestRegressionOp",

"ml.combust.mleap.bundle.ops.ensemble.CategoricalDrilldownOp",

"ml.combust.mleap.bundle.ops.recommendation.ALSOp",

"ml.combust.mleap.bundle.ops.PipelineOp"
]

ml.combust.mleap.registry.default.ops += "ml.combust.mleap.registry.builtin-ops"

ml.combust.mleap.xgboost.ops = [
"ml.combust.mleap.xgboost.runtime.bundle.ops.XGBoostClassificationOp",
"ml.combust.mleap.xgboost.runtime.bundle.ops.XGBoostRegressionOp"
]

ml.combust.mleap.registry.default.ops += "ml.combust.mleap.xgboost.ops"

I have added the below mleap dependencies in my gradle
implementation group: 'ml.combust.mleap', name : "mleap-xgboost-spark_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name: "mleap-xgboost-runtime_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name : "mleap-spark_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name: "mleap-spark-extension_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name: "mleap-runtime_${scalaVersion}", version: '0.23.1'

Please let me know if you I need to add any additional config to solve this error.

@jsleight
Copy link
Contributor

jsleight commented Mar 4, 2024

I suspect something is wrong with the op registry, though don't see anything obviously wrong with what you've put in the reference.conf. I usually use sbt and merge strategies to handle that. Not sure what the gradle equivalent would be.

Is it just PipelineModel that is having issues or is it everything (and PipelineModel just gets hit first)? E.g., try just doing the first stage all by itself. If that fails too, then I suspect nothing is getting registered at all. If the first stage works by itself, then something is messed up with the PipelineOp registry.

@subbula1979
Copy link
Author

subbula1979 commented Mar 5, 2024

I am getting the best model from the Cross Validator before serializing to mleap bundle. I have updated the step above.

As far as the reference.conf is concerned, it is under src/main/resources folder. I am not doing any steps in gradle to include the reference.conf in the fat jar. I think by default the gradle packages the reference.conf under the resources folder for the fat jar. I have verified this by unzipping the fat jar.

I don't have application.properties file. I am using my own json file and parsing it in my code.

Surprisingly, the unit test case passes in local i.e. I am able to serialize the pipeline model to a mleap bundle with the same above steps. problem occurs only when the spark job is submitted in the cluster thru SparkApplication.

@jsleight
Copy link
Contributor

jsleight commented Mar 5, 2024

Surprisingly, the unit test case passes in local i.e. I am able to serialize the pipeline model to a mleap bundle with the same above steps. problem occurs only when the spark job is submitted in the cluster thru SparkApplication.

That is extra strange. Can you confirm that Spark's jvm and your local jvm have the same packages installed.

@subbula1979
Copy link
Author

The Reason for the error is the reference.conf being overwritten from various dependencies and I have used the ShadowJar gradle Plugin to append all the entries in reference.conf and now, the model is being serialized to a mleap bundle.

Thanks so much for your help.

Now, I would like to save the bundle directly to a minio bucket .

The SparkContext is already having the properties like access key / secret key / minio host etc
to connect to minio and I believe HadoopBundleFileSystem object will automatically be generated and attached to the SparkBundleContext.
I tried both the below steps but faced runtime errors

implicit val sbc: SparkBundleContext = SparkBundleContext().withDataset(transformedDf)
val result=bestModel.writeBundle.save(new URI("hdfs:///mleaptraining/bundle.zip"))(sbc).get

implicit val sbc: SparkBundleContext = SparkBundleContext().withDataset(transformedDf)
val result=bestModel.writeBundle.save(new URI("s3a:///mleaptraining/bundle.zip"))(sbc).get

Pls let me know if it is possible to directly save the bundle to a minio bucket.

@jsleight
Copy link
Contributor

you might need to add bundle-hdfs package into your jvm for the hdfs writing to work https://github.com/combust/mleap/blob/8784e8eec1a5adf2164e1cb3e18f4d853d94fe36/bundle-hdfs/README.md

Not sure if direct writing to s3 will work or not. As a workaround, you can always write to the local filesystem and then use AWS sdk to copy it over.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants