Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

We distribute our Python app, which uses Spark, together with Python 3.7 interpreter (python.exe with all necessary libs lies near MyApp.exe).

To set PYSPARK_PYTHON we have have function which determines the path to our python.exe:

os.environ['PYSPARK_PYTHON'] = get_python()  

on Windows PYSPARK_PYTHON will become C:/MyApp/python.exe
on Ubuntu PYSPARK_PYTHON will become /opt/MyApp/python.exe

We start the master/driver node and create SparkSession on Windows. Then we start the worker node on Ubuntu but the worker fails with:

Job aborted due to stage failure: Task 1 in stage 11.0 failed 4 times, most recent failure: Lost task 1.3 in stage 11.0 (TID 1614, 10.0.2.15, executor 1): java.io.IOException: Cannot run program "C:/MyApp/python.exe": error=2, No such file or directory

Of course, there is no C:/MyApp/python.exe on ubuntu.

If I understand this error correctly, PYSPARK_PYTHON from driver is sent to all workers.

Also tried to set PYSPARK_PYTHON in spark-env.sh and spark-defaults.conf. How can I change PYSPARK_PYTHON on Ubuntu workers to become /opt/MyApp/python.exe?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
175 views
Welcome To Ask or Share your Answers For Others

1 Answer

Browsing through the souce code, it looks like the Python driver code puts the value of the Python executable path from its Spark context when creating work items for running Python functions in spark/rdd.py:

def _wrap_function(sc, func, deserializer, serializer, profiler=None):
    assert deserializer, "deserializer should not be empty"
    assert serializer, "serializer should not be empty"
    command = (func, profiler, deserializer, serializer)
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
                                                                             ^^^^^^^^^^^^^
                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)

The Python runner PythonRunner.scala then uses the path stored in the first work item it receives to launch new interpreter instances:

private[spark] abstract class BasePythonRunner[IN, OUT](
    funcs: Seq[ChainedPythonFunctions],
    evalType: Int,
    argOffsets: Array[Array[Int]])
  extends Logging {
  ...
  protected val pythonExec: String = funcs.head.funcs.head.pythonExec
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  ...
  def compute(
      inputIterator: Iterator[IN],
      partitionIndex: Int,
      context: TaskContext): Iterator[OUT] = {
    ...
    val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
    ...
  }
  ...
}

Based on that, I'm afraid that it seems not currently possible to have separate configurations for the Python executable in the master and in the workers. Also see the third comment to issue SPARK-26404. Perhaps you should file an RFE with the Apache Spark project.

I'm not a Spark guru though and there might still be a way to do it, perhaps by setting PYSPARK_PYTHON to just "python" and then making sure the system PATH is configured accordingly so that your Python executable comes first.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share

548k questions

547k answers

4 comments

86.3k users

...