Spark Task not serializable/No class defined for 3rd party Jar
I have been searching on Google or Stackoverflow for a week and still cannot find a good answer on this.
I have a chemical compound dataset and I need to use a third party Jar to read these compounds in SDF (a JSON like data format). And then I have to calculate the similarities between different compounds. Reading and calculating require very complicated chemical details so I cannot reproduce the function by myself. That is, I have to run the function within the mapping function on Spark using this 3rd party Jar. The Jar file is called JCompoundMapper. It reads the atomic bond iteratively using DFS algorithm and it seems to be very complicated. Anyways, this thread is not about reading chemical compound. But about how to map a 3rd party jar on Spark. When I tried to do this, I came across the task not serializable issue:
import de.zbit.jcmapper.distance.DistanceTanimoto
import de.zbit.jcmapper.distance.IDistanceMeasure
import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
import de.zbit.jcmapper.fingerprinters.features.FeatureMap
import de.zbit.jcmapper.fingerprinters.features.IFeature
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
import de.zbit.jcmapper.io.writer.ExporterLinear
import de.zbit.jcmapper.io.writer.ExporterSDFProperty
import java.io.FileWriter
import java.util.List
import java.io.File
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
val func = combinations.map(x => {
getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 48 elided
Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`
I read other threads and understand that I have to put the variables and functions in an object to make it serializable. However, if I do this, I came across the null pointer exception error:
object Holder{
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
}
val func = combinations.map(x => {
Holder.getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`
For the reading part, I can use a huge linkedHashMap and store all the compounds there. However, I have to use getSimilarity()
function to calculate the similarity using the 3rd party jar. So even if I only use the getSimilarity()
funciton, if I put it inside an object I have the null pointer exception. If I put it outside the object I have the task not serializable issue. Therefore, I have couple questions that I was not able to find a good answer:
- Does Spark support mapping the third party Jar on to each executor? Say in the reader file, does Spark distribute the reader class into each executor and read the file individually or read the file as a whole, and then distribute the file into smaller pieces on each executor?
- Why is it showing the null pointer exception problem? It seems the object indeed solved the serialization problem but not the null pointer exception.
- I am a new data engineer and not yet an expert in Spark. But I am willing to learn what is the best practice when we need to map a third party jar onto spark and run the function in a distributed fashion.
Thank you very much for all of your answers! I really appreciate your help!
Best,
Michael
scala apache-spark mapreduce
add a comment |
I have been searching on Google or Stackoverflow for a week and still cannot find a good answer on this.
I have a chemical compound dataset and I need to use a third party Jar to read these compounds in SDF (a JSON like data format). And then I have to calculate the similarities between different compounds. Reading and calculating require very complicated chemical details so I cannot reproduce the function by myself. That is, I have to run the function within the mapping function on Spark using this 3rd party Jar. The Jar file is called JCompoundMapper. It reads the atomic bond iteratively using DFS algorithm and it seems to be very complicated. Anyways, this thread is not about reading chemical compound. But about how to map a 3rd party jar on Spark. When I tried to do this, I came across the task not serializable issue:
import de.zbit.jcmapper.distance.DistanceTanimoto
import de.zbit.jcmapper.distance.IDistanceMeasure
import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
import de.zbit.jcmapper.fingerprinters.features.FeatureMap
import de.zbit.jcmapper.fingerprinters.features.IFeature
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
import de.zbit.jcmapper.io.writer.ExporterLinear
import de.zbit.jcmapper.io.writer.ExporterSDFProperty
import java.io.FileWriter
import java.util.List
import java.io.File
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
val func = combinations.map(x => {
getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 48 elided
Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`
I read other threads and understand that I have to put the variables and functions in an object to make it serializable. However, if I do this, I came across the null pointer exception error:
object Holder{
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
}
val func = combinations.map(x => {
Holder.getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`
For the reading part, I can use a huge linkedHashMap and store all the compounds there. However, I have to use getSimilarity()
function to calculate the similarity using the 3rd party jar. So even if I only use the getSimilarity()
funciton, if I put it inside an object I have the null pointer exception. If I put it outside the object I have the task not serializable issue. Therefore, I have couple questions that I was not able to find a good answer:
- Does Spark support mapping the third party Jar on to each executor? Say in the reader file, does Spark distribute the reader class into each executor and read the file individually or read the file as a whole, and then distribute the file into smaller pieces on each executor?
- Why is it showing the null pointer exception problem? It seems the object indeed solved the serialization problem but not the null pointer exception.
- I am a new data engineer and not yet an expert in Spark. But I am willing to learn what is the best practice when we need to map a third party jar onto spark and run the function in a distributed fashion.
Thank you very much for all of your answers! I really appreciate your help!
Best,
Michael
scala apache-spark mapreduce
add a comment |
I have been searching on Google or Stackoverflow for a week and still cannot find a good answer on this.
I have a chemical compound dataset and I need to use a third party Jar to read these compounds in SDF (a JSON like data format). And then I have to calculate the similarities between different compounds. Reading and calculating require very complicated chemical details so I cannot reproduce the function by myself. That is, I have to run the function within the mapping function on Spark using this 3rd party Jar. The Jar file is called JCompoundMapper. It reads the atomic bond iteratively using DFS algorithm and it seems to be very complicated. Anyways, this thread is not about reading chemical compound. But about how to map a 3rd party jar on Spark. When I tried to do this, I came across the task not serializable issue:
import de.zbit.jcmapper.distance.DistanceTanimoto
import de.zbit.jcmapper.distance.IDistanceMeasure
import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
import de.zbit.jcmapper.fingerprinters.features.FeatureMap
import de.zbit.jcmapper.fingerprinters.features.IFeature
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
import de.zbit.jcmapper.io.writer.ExporterLinear
import de.zbit.jcmapper.io.writer.ExporterSDFProperty
import java.io.FileWriter
import java.util.List
import java.io.File
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
val func = combinations.map(x => {
getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 48 elided
Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`
I read other threads and understand that I have to put the variables and functions in an object to make it serializable. However, if I do this, I came across the null pointer exception error:
object Holder{
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
}
val func = combinations.map(x => {
Holder.getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`
For the reading part, I can use a huge linkedHashMap and store all the compounds there. However, I have to use getSimilarity()
function to calculate the similarity using the 3rd party jar. So even if I only use the getSimilarity()
funciton, if I put it inside an object I have the null pointer exception. If I put it outside the object I have the task not serializable issue. Therefore, I have couple questions that I was not able to find a good answer:
- Does Spark support mapping the third party Jar on to each executor? Say in the reader file, does Spark distribute the reader class into each executor and read the file individually or read the file as a whole, and then distribute the file into smaller pieces on each executor?
- Why is it showing the null pointer exception problem? It seems the object indeed solved the serialization problem but not the null pointer exception.
- I am a new data engineer and not yet an expert in Spark. But I am willing to learn what is the best practice when we need to map a third party jar onto spark and run the function in a distributed fashion.
Thank you very much for all of your answers! I really appreciate your help!
Best,
Michael
scala apache-spark mapreduce
I have been searching on Google or Stackoverflow for a week and still cannot find a good answer on this.
I have a chemical compound dataset and I need to use a third party Jar to read these compounds in SDF (a JSON like data format). And then I have to calculate the similarities between different compounds. Reading and calculating require very complicated chemical details so I cannot reproduce the function by myself. That is, I have to run the function within the mapping function on Spark using this 3rd party Jar. The Jar file is called JCompoundMapper. It reads the atomic bond iteratively using DFS algorithm and it seems to be very complicated. Anyways, this thread is not about reading chemical compound. But about how to map a 3rd party jar on Spark. When I tried to do this, I came across the task not serializable issue:
import de.zbit.jcmapper.distance.DistanceTanimoto
import de.zbit.jcmapper.distance.IDistanceMeasure
import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
import de.zbit.jcmapper.fingerprinters.features.FeatureMap
import de.zbit.jcmapper.fingerprinters.features.IFeature
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
import de.zbit.jcmapper.io.writer.ExporterLinear
import de.zbit.jcmapper.io.writer.ExporterSDFProperty
import java.io.FileWriter
import java.util.List
import java.io.File
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
val func = combinations.map(x => {
getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 48 elided
Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`
I read other threads and understand that I have to put the variables and functions in an object to make it serializable. However, if I do this, I came across the null pointer exception error:
object Holder{
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
}
val func = combinations.map(x => {
Holder.getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`
For the reading part, I can use a huge linkedHashMap and store all the compounds there. However, I have to use getSimilarity()
function to calculate the similarity using the 3rd party jar. So even if I only use the getSimilarity()
funciton, if I put it inside an object I have the null pointer exception. If I put it outside the object I have the task not serializable issue. Therefore, I have couple questions that I was not able to find a good answer:
- Does Spark support mapping the third party Jar on to each executor? Say in the reader file, does Spark distribute the reader class into each executor and read the file individually or read the file as a whole, and then distribute the file into smaller pieces on each executor?
- Why is it showing the null pointer exception problem? It seems the object indeed solved the serialization problem but not the null pointer exception.
- I am a new data engineer and not yet an expert in Spark. But I am willing to learn what is the best practice when we need to map a third party jar onto spark and run the function in a distributed fashion.
Thank you very much for all of your answers! I really appreciate your help!
Best,
Michael
scala apache-spark mapreduce
scala apache-spark mapreduce
asked Nov 25 '18 at 15:16
logydoghanlogydoghan
61
61
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
I think the problem is in this line:
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
By placing this code in an object
each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf
from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?
If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.
Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error:val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
So I guess the problem is still with the serialization.
– logydoghan
Nov 26 '18 at 3:17
Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.
– Erik van Oosten
Nov 26 '18 at 7:56
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53468901%2fspark-task-not-serializable-no-class-defined-for-3rd-party-jar%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I think the problem is in this line:
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
By placing this code in an object
each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf
from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?
If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.
Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error:val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
So I guess the problem is still with the serialization.
– logydoghan
Nov 26 '18 at 3:17
Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.
– Erik van Oosten
Nov 26 '18 at 7:56
add a comment |
I think the problem is in this line:
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
By placing this code in an object
each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf
from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?
If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.
Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error:val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
So I guess the problem is still with the serialization.
– logydoghan
Nov 26 '18 at 3:17
Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.
– Erik van Oosten
Nov 26 '18 at 7:56
add a comment |
I think the problem is in this line:
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
By placing this code in an object
each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf
from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?
If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.
I think the problem is in this line:
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
By placing this code in an object
each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf
from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?
If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.
answered Nov 25 '18 at 19:24
Erik van OostenErik van Oosten
72979
72979
Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error:val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
So I guess the problem is still with the serialization.
– logydoghan
Nov 26 '18 at 3:17
Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.
– Erik van Oosten
Nov 26 '18 at 7:56
add a comment |
Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error:val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
So I guess the problem is still with the serialization.
– logydoghan
Nov 26 '18 at 3:17
Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.
– Erik van Oosten
Nov 26 '18 at 7:56
Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error:
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
So I guess the problem is still with the serialization.– logydoghan
Nov 26 '18 at 3:17
Hi thanks for the answer. I actually tried to put that line outside the scope and save all the data in a huge linkedHashMap. And then I avoid using reader but this line has the same error:
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
So I guess the problem is still with the serialization.– logydoghan
Nov 26 '18 at 3:17
Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.
– Erik van Oosten
Nov 26 '18 at 7:56
Whatever is put in the linkedhashmap needs to be serializable. If you have the source code of the library, you should be able to figure out what is serializable and what is not. It might not be easy though.
– Erik van Oosten
Nov 26 '18 at 7:56
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53468901%2fspark-task-not-serializable-no-class-defined-for-3rd-party-jar%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown