Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do we support scala & java code write tensorflow model with tenorflow-core-api ? #588

Open
mullerhai opened this issue Jul 11, 2022 · 3 comments

Comments

@mullerhai
Copy link

Environment:

  • Python version [e.g. 2.7, 3.6]
  • Spark version [e.g. 2.1, 2.3.1]
  • TensorFlow version [e.g. 1.5, 1.9.0]
  • TensorFlowOnSpark version [e.g. 1.1, 1.3.2]
  • Cluster version [e.g. Standalone, Hadoop 2.8, CDH5]

Describe the bug:
A clear and concise description of what the bug is.

Logs:
If applicable, add logs to help explain your problem. Note: errors may not be fully described in the driver/console logs. Make sure to check the executor logs for possible root causes.

Spark Submit Command Line:
If applicable, add your spark-submit command line.

@mullerhai mullerhai changed the title do we support keras model and scala do we support scala & java code write tensorflow model with tenorflow-core-api ? Jul 11, 2022
@leewyang
Copy link
Contributor

Unfortunately, Java API support in TF has been spotty with deprecation warnings and no API stability guarantees. We initially tried to support Java when the API was updated regularly with each TF release, but even then, it was mostly geared towards inference and not training. That said, contributions are always welcome!

@mullerhai
Copy link
Author

Unfortunately, Java API support in TF has been spotty with deprecation warnings and no API stability guarantees. We initially tried to support Java when the API was updated regularly with each TF release, but even then, it was mostly geared towards inference and not training. That said, contributions are always welcome!

for me when I read our source code ,I was inspired by these scala code in TFModel, we do the model implement spark Model Interface api, tensor convert df and df convert to tensor ,and invoke tensorflow session, and get the distribute partition block ,mapPartition do model train,collect all partitions result for one model

  override def transform(dataset: Dataset[_]): DataFrame = {
    val spark = dataset.sparkSession

    val inputColumns = this.getInputMapping.keys.toSeq
    val inputTensorNames = this.getInputMapping.values
    val outputTensorNames = this.getOutputMapping.keys.toSeq

    val inputDF = dataset.select(inputColumns.head, inputColumns.tail: _*)
    val inputSchema = inputDF.schema
    val outputSchema = transformSchema(inputSchema)

    val outputRDD = inputDF.rdd.mapPartitions { iter: Iterator[Row] =>
      if (TFModel.model == null || TFModel.modelDir != this.getModel) {
        // load model into a per-executor singleton reference, if needed.
        TFModel.modelDir = this.getModel
        TFModel.model = SavedModelBundle.load(this.getModel, this.getTag)
        TFModel.graph = TFModel.model.graph
        TFModel.sess = TFModel.model.session
      }

      iter.grouped(this.getBatchSize).flatMap { batch =>
        // get input batch of Rows and convert to list of input Tensors
        val inputTensors = batch2tensors(batch, inputSchema)

        var runner = TFModel.sess.runner()

        // feed input tensors
        for ((name, tensor) <- inputTensors) {
          runner = runner.feed(this.getInputMapping(name), tensor)
        }
        // fetch output tensors
        for (name <- outputTensorNames) {
          runner = runner.fetch(name)
        }

        // run the graph
        val outputTensors = runner.run()

        assert(outputTensors.map(_.shape).map(s => if (s.isEmpty) 0L else s.apply(0)).distinct.size == 1,
          "Cardinality of output tensors must match")

        // convert the list of output Tensors to a batch of output Rows
        tensors2batch(outputTensors)
      }
    }

    spark.createDataFrame(outputRDD, outputSchema)
  }

@mokundong
Copy link

mokundong commented Jul 19, 2022

got an error "Could not find SavedModel .pb" when submit on yarn cluster at code "TFModel.model = SavedModelBundle.load(this.getModel, this.getTag)"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants