Spark Task not serializable/No class defined for 3rd party Jar












1















I have been searching on Google or Stackoverflow for a week and still cannot find a good answer on this.



I have a chemical compound dataset and I need to use a third party Jar to read these compounds in SDF (a JSON like data format). And then I have to calculate the similarities between different compounds. Reading and calculating require very complicated chemical details so I cannot reproduce the function by myself. That is, I have to run the function within the mapping function on Spark using this 3rd party Jar. The Jar file is called JCompoundMapper. It reads the atomic bond iteratively using DFS algorithm and it seems to be very complicated. Anyways, this thread is not about reading chemical compound. But about how to map a 3rd party jar on Spark. When I tried to do this, I came across the task not serializable issue:



import de.zbit.jcmapper.distance.DistanceTanimoto
import de.zbit.jcmapper.distance.IDistanceMeasure
import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
import de.zbit.jcmapper.fingerprinters.features.FeatureMap
import de.zbit.jcmapper.fingerprinters.features.IFeature
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
import de.zbit.jcmapper.io.writer.ExporterLinear
import de.zbit.jcmapper.io.writer.ExporterSDFProperty
import java.io.FileWriter
import java.util.List
import java.io.File

val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp


val func = combinations.map(x => {
getSimilarity(0, 1)
}).take(5)

Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 48 elided
Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`


I read other threads and understand that I have to put the variables and functions in an object to make it serializable. However, if I do this, I came across the null pointer exception error:



object Holder{
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
}


val func = combinations.map(x => {
Holder.getSimilarity(0, 1)
}).take(5)


Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`


For the reading part, I can use a huge linkedHashMap and store all the compounds there. However, I have to use getSimilarity() function to calculate the similarity using the 3rd party jar. So even if I only use the getSimilarity() funciton, if I put it inside an object I have the null pointer exception. If I put it outside the object I have the task not serializable issue. Therefore, I have couple questions that I was not able to find a good answer:




  1. Does Spark support mapping the third party Jar on to each executor? Say in the reader file, does Spark distribute the reader class into each executor and read the file individually or read the file as a whole, and then distribute the file into smaller pieces on each executor?

  2. Why is it showing the null pointer exception problem? It seems the object indeed solved the serialization problem but not the null pointer exception.

  3. I am a new data engineer and not yet an expert in Spark. But I am willing to learn what is the best practice when we need to map a third party jar onto spark and run the function in a distributed fashion.


Thank you very much for all of your answers! I really appreciate your help!



Best,
Michael










share|improve this question



























    1















    I have been searching on Google or Stackoverflow for a week and still cannot find a good answer on this.



    I have a chemical compound dataset and I need to use a third party Jar to read these compounds in SDF (a JSON like data format). And then I have to calculate the similarities between different compounds. Reading and calculating require very complicated chemical details so I cannot reproduce the function by myself. That is, I have to run the function within the mapping function on Spark using this 3rd party Jar. The Jar file is called JCompoundMapper. It reads the atomic bond iteratively using DFS algorithm and it seems to be very complicated. Anyways, this thread is not about reading chemical compound. But about how to map a 3rd party jar on Spark. When I tried to do this, I came across the task not serializable issue:



    import de.zbit.jcmapper.distance.DistanceTanimoto
    import de.zbit.jcmapper.distance.IDistanceMeasure
    import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
    import de.zbit.jcmapper.fingerprinters.features.FeatureMap
    import de.zbit.jcmapper.fingerprinters.features.IFeature
    import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
    import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
    import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
    import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
    import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
    import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
    import de.zbit.jcmapper.io.writer.ExporterLinear
    import de.zbit.jcmapper.io.writer.ExporterSDFProperty
    import java.io.FileWriter
    import java.util.List
    import java.io.File

    val similarity: IDistanceMeasure = new DistanceTanimoto()
    val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
    val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
    val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
    def getSimilarity( id1:Int, id2:Int ) : Double = {
    val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
    featureMaps.add(new FeatureMap(rawFeatures))
    featureMaps.add(new FeatureMap(rawFeatures2))
    val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
    return temp


    val func = combinations.map(x => {
    getSimilarity(0, 1)
    }).take(5)

    Name: org.apache.spark.SparkException
    Message: Task not serializable
    StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.map(RDD.scala:370)
    ... 48 elided
    Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`


    I read other threads and understand that I have to put the variables and functions in an object to make it serializable. However, if I do this, I came across the null pointer exception error:



    object Holder{
    val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
    val similarity: IDistanceMeasure = new DistanceTanimoto()
    val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
    val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
    val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
    def getSimilarity( id1:Int, id2:Int ) : Double = {
    val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
    featureMaps.add(new FeatureMap(rawFeatures))
    featureMaps.add(new FeatureMap(rawFeatures2))
    val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
    return temp
    }


    val func = combinations.map(x => {
    Holder.getSimilarity(0, 1)
    }).take(5)


    Name: org.apache.spark.SparkException
    Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
    at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
    at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
    at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
    at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
    at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
    at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
    at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`


    For the reading part, I can use a huge linkedHashMap and store all the compounds there. However, I have to use getSimilarity() function to calculate the similarity using the 3rd party jar. So even if I only use the getSimilarity() funciton, if I put it inside an object I have the null pointer exception. If I put it outside the object I have the task not serializable issue. Therefore, I have couple questions that I was not able to find a good answer:




    1. Does Spark support mapping the third party Jar on to each executor? Say in the reader file, does Spark distribute the reader class into each executor and read the file individually or read the file as a whole, and then distribute the file into smaller pieces on each executor?

    2. Why is it showing the null pointer exception problem? It seems the object indeed solved the serialization problem but not the null pointer exception.

    3. I am a new data engineer and not yet an expert in Spark. But I am willing to learn what is the best practice when we need to map a third party jar onto spark and run the function in a distributed fashion.


    Thank you very much for all of your answers! I really appreciate your help!



    Best,
    Michael










    share|improve this question

























      1












      1








      1








      I have been searching on Google or Stackoverflow for a week and still cannot find a good answer on this.



      I have a chemical compound dataset and I need to use a third party Jar to read these compounds in SDF (a JSON like data format). And then I have to calculate the similarities between different compounds. Reading and calculating require very complicated chemical details so I cannot reproduce the function by myself. That is, I have to run the function within the mapping function on Spark using this 3rd party Jar. The Jar file is called JCompoundMapper. It reads the atomic bond iteratively using DFS algorithm and it seems to be very complicated. Anyways, this thread is not about reading chemical compound. But about how to map a 3rd party jar on Spark. When I tried to do this, I came across the task not serializable issue:



      import de.zbit.jcmapper.distance.DistanceTanimoto
      import de.zbit.jcmapper.distance.IDistanceMeasure
      import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
      import de.zbit.jcmapper.fingerprinters.features.FeatureMap
      import de.zbit.jcmapper.fingerprinters.features.IFeature
      import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
      import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
      import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
      import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
      import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
      import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
      import de.zbit.jcmapper.io.writer.ExporterLinear
      import de.zbit.jcmapper.io.writer.ExporterSDFProperty
      import java.io.FileWriter
      import java.util.List
      import java.io.File

      val similarity: IDistanceMeasure = new DistanceTanimoto()
      val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
      val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
      val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
      def getSimilarity( id1:Int, id2:Int ) : Double = {
      val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
      featureMaps.add(new FeatureMap(rawFeatures))
      featureMaps.add(new FeatureMap(rawFeatures2))
      val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
      return temp


      val func = combinations.map(x => {
      getSimilarity(0, 1)
      }).take(5)

      Name: org.apache.spark.SparkException
      Message: Task not serializable
      StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
      at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
      at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
      at org.apache.spark.rdd.RDD.map(RDD.scala:370)
      ... 48 elided
      Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`


      I read other threads and understand that I have to put the variables and functions in an object to make it serializable. However, if I do this, I came across the null pointer exception error:



      object Holder{
      val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
      val similarity: IDistanceMeasure = new DistanceTanimoto()
      val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
      val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
      val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
      def getSimilarity( id1:Int, id2:Int ) : Double = {
      val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
      featureMaps.add(new FeatureMap(rawFeatures))
      featureMaps.add(new FeatureMap(rawFeatures2))
      val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
      return temp
      }


      val func = combinations.map(x => {
      Holder.getSimilarity(0, 1)
      }).take(5)


      Name: org.apache.spark.SparkException
      Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
      at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
      at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
      at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
      at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
      at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
      at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
      at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
      at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
      at scala.collection.AbstractIterator.to(Iterator.scala:1336)
      at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
      at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
      at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
      at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`


      For the reading part, I can use a huge linkedHashMap and store all the compounds there. However, I have to use getSimilarity() function to calculate the similarity using the 3rd party jar. So even if I only use the getSimilarity() funciton, if I put it inside an object I have the null pointer exception. If I put it outside the object I have the task not serializable issue. Therefore, I have couple questions that I was not able to find a good answer:




      1. Does Spark support mapping the third party Jar on to each executor? Say in the reader file, does Spark distribute the reader class into each executor and read the file individually or read the file as a whole, and then distribute the file into smaller pieces on each executor?

      2. Why is it showing the null pointer exception problem? It seems the object indeed solved the serialization problem but not the null pointer exception.

      3. I am a new data engineer and not yet an expert in Spark. But I am willing to learn what is the best practice when we need to map a third party jar onto spark and run the function in a distributed fashion.


      Thank you very much for all of your answers! I really appreciate your help!



      Best,
      Michael










      share|improve this question














      I have been searching on Google or Stackoverflow for a week and still cannot find a good answer on this.



      I have a chemical compound dataset and I need to use a third party Jar to read these compounds in SDF (a JSON like data format). And then I have to calculate the similarities between different compounds. Reading and calculating require very complicated chemical details so I cannot reproduce the function by myself. That is, I have to run the function within the mapping function on Spark using this 3rd party Jar. The Jar file is called JCompoundMapper. It reads the atomic bond iteratively using DFS algorithm and it seems to be very complicated. Anyways, this thread is not about reading chemical compound. But about how to map a 3rd party jar on Spark. When I tried to do this, I came across the task not serializable issue:



      import de.zbit.jcmapper.distance.DistanceTanimoto
      import de.zbit.jcmapper.distance.IDistanceMeasure
      import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
      import de.zbit.jcmapper.fingerprinters.features.FeatureMap
      import de.zbit.jcmapper.fingerprinters.features.IFeature
      import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
      import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
      import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
      import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
      import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
      import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
      import de.zbit.jcmapper.io.writer.ExporterLinear
      import de.zbit.jcmapper.io.writer.ExporterSDFProperty
      import java.io.FileWriter
      import java.util.List
      import java.io.File

      val similarity: IDistanceMeasure = new DistanceTanimoto()
      val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
      val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
      val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
      def getSimilarity( id1:Int, id2:Int ) : Double = {
      val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
      featureMaps.add(new FeatureMap(rawFeatures))
      featureMaps.add(new FeatureMap(rawFeatures2))
      val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
      return temp


      val func = combinations.map(x => {
      getSimilarity(0, 1)
      }).take(5)

      Name: org.apache.spark.SparkException
      Message: Task not serializable
      StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
      at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
      at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
      at org.apache.spark.rdd.RDD.map(RDD.scala:370)
      ... 48 elided
      Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`


      I read other threads and understand that I have to put the variables and functions in an object to make it serializable. However, if I do this, I came across the null pointer exception error:



      object Holder{
      val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
      val similarity: IDistanceMeasure = new DistanceTanimoto()
      val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
      val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
      val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
      def getSimilarity( id1:Int, id2:Int ) : Double = {
      val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
      featureMaps.add(new FeatureMap(rawFeatures))
      featureMaps.add(new FeatureMap(rawFeatures2))
      val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
      return temp
      }


      val func = combinations.map(x => {
      Holder.getSimilarity(0, 1)
      }).take(5)


      Name: org.apache.spark.SparkException
      Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
      at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
      at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
      at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
      at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
      at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
      at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
      at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
      at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
      at scala.collection.AbstractIterator.to(Iterator.scala:1336)
      at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
      at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
      at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
      at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`


      For the reading part, I can use a huge linkedHashMap and store all the compounds there. However, I have to use getSimilarity() function to calculate the similarity using the 3rd party jar. So even if I only use the getSimilarity() funciton, if I put it inside an object I have the null pointer exception. If I put it outside the object I have the task not serializable issue. Therefore, I have couple questions that I was not able to find a good answer:




      1. Does Spark support mapping the third party Jar on to each executor? Say in the reader file, does Spark distribute the reader class into each executor and read the file individually or read the file as a whole, and then distribute the file into smaller pieces on each executor?

      2. Why is it showing the null pointer exception problem? It seems the object indeed solved the serialization problem but not the null pointer exception.

      3. I am a new data engineer and not yet an expert in Spark. But I am willing to learn what is the best practice when we need to map a third party jar onto spark and run the function in a distributed fashion.


      Thank you very much for all of your answers! I really appreciate your help!



      Best,
      Michael







      scala apache-spark mapreduce






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 25 '18 at 15:16









      logydoghanlogydoghan

      61




      61
























          1 Answer
          1






          active

          oldest

          votes


















          2














          I think the problem is in this line:



          val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))


          By placing this code in an object each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?



          If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.






          share|improve this answer
























          • Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error: val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1)) So I guess the problem is still with the serialization.

            – logydoghan
            Nov 26 '18 at 3:17













          • Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.

            – Erik van Oosten
            Nov 26 '18 at 7:56











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53468901%2fspark-task-not-serializable-no-class-defined-for-3rd-party-jar%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          2














          I think the problem is in this line:



          val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))


          By placing this code in an object each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?



          If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.






          share|improve this answer
























          • Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error: val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1)) So I guess the problem is still with the serialization.

            – logydoghan
            Nov 26 '18 at 3:17













          • Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.

            – Erik van Oosten
            Nov 26 '18 at 7:56
















          2














          I think the problem is in this line:



          val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))


          By placing this code in an object each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?



          If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.






          share|improve this answer
























          • Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error: val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1)) So I guess the problem is still with the serialization.

            – logydoghan
            Nov 26 '18 at 3:17













          • Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.

            – Erik van Oosten
            Nov 26 '18 at 7:56














          2












          2








          2







          I think the problem is in this line:



          val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))


          By placing this code in an object each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?



          If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.






          share|improve this answer













          I think the problem is in this line:



          val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))


          By placing this code in an object each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?



          If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 25 '18 at 19:24









          Erik van OostenErik van Oosten

          72979




          72979













          • Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error: val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1)) So I guess the problem is still with the serialization.

            – logydoghan
            Nov 26 '18 at 3:17













          • Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.

            – Erik van Oosten
            Nov 26 '18 at 7:56



















          • Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error: val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1)) So I guess the problem is still with the serialization.

            – logydoghan
            Nov 26 '18 at 3:17













          • Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.

            – Erik van Oosten
            Nov 26 '18 at 7:56

















          Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error: val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1)) So I guess the problem is still with the serialization.

          – logydoghan
          Nov 26 '18 at 3:17







          Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error: val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1)) So I guess the problem is still with the serialization.

          – logydoghan
          Nov 26 '18 at 3:17















          Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.

          – Erik van Oosten
          Nov 26 '18 at 7:56





          Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.

          – Erik van Oosten
          Nov 26 '18 at 7:56


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53468901%2fspark-task-not-serializable-no-class-defined-for-3rd-party-jar%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

          Calculate evaluation metrics using cross_val_predict sklearn

          Insert data from modal to MySQL (multiple modal on website)