Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have a service that is running (Twisted jsonrpc server). When I make a call to "run_procs" the service will look at a bunch of objects and inspect their timestamp property to see if they should run. If they should, they get added to a thread_pool (list) and then every item in the thread_pool gets the start() method called.

I have used this setup for several other applications where I wanted to run a function within my class with theading. However, when I am using a subprocess.Popen call in the function called by each thread, the calls run one-at-a-time instead of running concurrently like I would expect.

Here is some sample code:

class ProcService(jsonrpc.JSONRPC):
        self.thread_pool = []
        self.running_threads = []
        self.lock = threading.Lock()

        def clean_pool(self, thread_pool, join=False):
                for th in [x for x in thread_pool if not x.isAlive()]:
                        if join: th.join()
                        thread_pool.remove(th)
                        del th
                return thread_pool

        def run_threads(self, parallel=10):
                while len(self.running_threads)+len(self.thread_pool) > 0:
                        self.clean_pool(self.running_threads, join=True)
                        n = min(max(parallel - len(self.running_threads), 0), len(self.thread_pool))
                        if n > 0:
                                for th in self.thread_pool[0:n]: th.start()
                                self.running_threads.extend(self.thread_pool[0:n])
                                del self.thread_pool[0:n]
                        time.sleep(.01)
                for th in self.running_threads+self.thread_pool: th.join()

        def jsonrpc_run_procs(self):
                for i, item in enumerate(self.items):
                        if item.should_run():
                                self.thread_pool.append(threading.Thread(target=self.run_proc, args=tuple([item])))
                self.run_threads(5)

        def run_proc(self, proc):
                self.lock.acquire()
                print "
Subprocess started"
                p = subprocess.Popen('%s/program_to_run.py %s' %(os.getcwd(), proc.data), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,)
                stdout_value = proc.communicate('through stdin to stdout')[0]
                self.lock.release()

Any help/suggestions are appreciated.

* EDIT * OK. So now I want to read back the output from the stdout pipe. This works some of the time, but also fails with select.error: (4, 'Interrupted system call') I assume this is because sometimes the process has already terminated before I try to run the communicate method. the code in the run_proc method has been changed to:

def run_proc(self, proc): self.lock.acquire() p = subprocess.Popen( #etc self.running_procs.append([p, proc.data.id]) self.lock.release()

after I call self.run_threads(5) I call self.check_procs()

check_procs method iterates the list of running_procs to check for poll() is not None. How can I get output from pipe? I have tried both of the following

calling check_procs once:

def check_procs(self):
    for proc_details in self.running_procs:
        proc = proc_details[0]
        while (proc.poll() == None):
            time.sleep(0.1)
        stdout_value = proc.communicate('through stdin to stdout')[0]
        self.running_procs.remove(proc_details)
        print proc_details[1], stdout_value
        del proc_details

calling check_procs in while loop like:

while len(self.running_procs) > 0:
    self.check_procs()

def check_procs(self):
    for proc_details in self.running_procs:
        if (proc.poll() is not None):
            stdout_value = proc.communicate('through stdin to stdout')[0]
            self.running_procs.remove(proc_details)
            print proc_details[1], stdout_value
            del proc_details
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
182 views
Welcome To Ask or Share your Answers For Others

1 Answer

I think the key code is:

    self.lock.acquire()
    print "
Subprocess started"
    p = subprocess.Popen( # etc
    stdout_value = proc.communicate('through stdin to stdout')[0]
    self.lock.release()

the explicit calls to acquire and release should guarantee serialization -- don't you observe serialization just as invariably if you do other things in this block instead of the subprocess use?

Edit: all silence here, so I'll add the suggestion to remove the locking and instead put each stdout_value on a Queue.Queue() instance -- Queue is intrinsicaly threadsafe (deals with its own locking) so you can get (or get_nowait, etc etc) results from it once they're ready and have been put there. In general, Queue is the best way to arrange thread communication (and often synchronization too) in Python, any time it can be feasibly arranged to do things that way.

Specifically: add import Queue at the start; give up making, acquiring and releasing self.lock (just delete those three lines); add self.q = Queue.Queue() to the __init__; right after the call stdout_value = proc.communicate(... add one statement self.q.put(stdout_value); now e.g finish the jsonrpc_run_procs method with

while not self.q.empty():
  result = self.q.get()
  print 'One result is %r' % result

to confirm that all the results are there. (Normally the empty method of queues is not reliable, but in this case all threads putting to the queue are already finished, so you should be fine).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...