Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

Running Spark 1.6.2 (YARN mode)

Firstly, I have some code from this post to get filenames within Spark Streaming, so that could be the issue, but hopefully not.

Basically, I have this first job.

import org.apache.spark.SparkContext
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

def getStream(ssc: StreamingContext, dir: String): DStream[String] = {
    ssc.fileStream[LongWritable, Text, TextInputFormat](dir)
}

val sc = SparkContext.getOrCreate
val ssc = new StreamingContext(sc, Seconds(5))

val inputDir = "hdfs:///tmp/input"
val outputDir = "hdfs:///tmp/output1"

val stream1 = getStream(ssc, inputDir)
stream1.foreachRDD(rdd => rdd.saveAsTextFile(outputDir))

ssc.start()
ssc.awaitTermination()

And I also have a second job that, for this example, looks practically identical, just change around inputDir and outputDir, and move to a new outputDir = "hdfs:///tmp/output2".

Anyway, so I have to start the second streaming job before the first job because it needs to watch for new files. Makes sense...

Then, I start the first job and hadoop fs -copyFromLocal some files into the input folder since per the API

Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.

When I try to run this, it eventually crashes with a stacktrace that contains this

17/02/01 11:48:35 INFO FileInputDStream: Finding new files took 7 ms
17/02/01 11:48:35 INFO FileInputDStream: New files at time 1485949715000 ms:
hdfs://sandbox.hortonworks.com:8020/tmp/output1/_SUCCESS
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 355.9 KB, free 356.8 KB)
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 28.9 KB, free 385.7 KB)
17/02/01 11:48:35 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:43097 (size: 28.9 KB, free: 511.1 MB)
17/02/01 11:48:35 INFO SparkContext: Created broadcast 1 from fileStream at FileStreamTransformer.scala:45
17/02/01 11:48:35 ERROR JobScheduler: Error generating jobs for time 1485949715000 ms
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://sandbox.hortonworks.com:8020/output1/_SUCCESS
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
  at scala.Option.getOrElse(Option.scala:120)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:276)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:266)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:266)
  at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:153)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:253)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:251)
  at scala.util.Try$.apply(Try.scala:161)
  at org.apache.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
174 views
Welcome To Ask or Share your Answers For Others

1 Answer

to explicitly enforce that only new files are processed and to ensure touch files liek _SUCCESS are skipped we can use the below signature of fileStream

def getStream(ssc: StreamingContext, dir: String): DStream[String] = {
   ssc.fileStream[LongWritable, Text, TextInputFormat](dir,
      (path: org.apache.hadoop.fs.Path) => !path.getName.startsWith("_") || !path.getName().startsWith("."),
      newFilesOnly = true)
}

The newFileOnly defaults to true when not specified as shown here. So ideally _SUCCESS should not have been processed in your setup too.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...