How to pause processes in case they are consuming too much memory?












3















Background: I process planetary imagery using a set of command-line utilities provided by the US Geologic Survey. Some of them are RAM hogs, to the extreme (10s of GB). USGS says it's just the way they run and don't have any plans to try to better manage the RAM. I built a Python wrapper to manipulate file lists to call the different steps to process the data in parts (such as all images taken in one color filter, and all taken in another, and all taken in another, etc.). Because things are done to multiple lists and multiple images, I thread it, using all the CPUs I can, to change stuff that might otherwise take two months to run to a week. I don't use native Python methods to thread, at the moment; instead I use GNU Parallel (and use os.system("") to call parallel and then the function) or I use Pysis, which is a Python way to call and multithread the USGS software.



Problem: As noted, some steps, for some files, take a huge amount of RAM, and there is no way of knowing ahead of time what those might be. So, I can get in a situation where for some files, each process was using 200 MB and runs fine on a 16GB RAM machine with 8 cores, but then it might start to process other files where I get RAM creep, using several GB, which with 8 processors on a 16GB RAM machine means RAM is compressed, swap space is used ... and that's if I'm lucky and the machine doesn't just lock up.



Solution? What I'm looking for is a way to monitor the RAM usage, say once a minute, by process name, and if I start to see RAM creep (e.g., 8 instances of a process each using over 2GB of RAM), I can pause all but one of those, let that one finish, un-pause another, let that finish, etc. until those 8 are done, then continue on with the rest of what might need to run for that step. Hopefully obviously, all this would be done in Python, not manually.



Is it possible to do that? If so, how?










share|improve this question

























  • stackoverflow.com/questions/276052/…

    – Vedant Kandoi
    Nov 27 '18 at 6:40











  • Been there, read that, and I already use psutil elsewhere in the code to make some decisions. But that doesn’t tell me how to monitor and pause processes based on usage by single processes, unless I’m missing something in the answers.

    – Stuart Robbins
    Nov 27 '18 at 6:42






  • 1





    What about stackoverflow.com/questions/938733/…

    – Vedant Kandoi
    Nov 27 '18 at 6:52











  • So, to use that, it looks like I'd need to put process=psutil.Process(os.getpid()).memory_info().rss into each thread component, but how would I get that to update every ~minute, and then have some super-monitor that pauses things when RAM usage gets too high?

    – Stuart Robbins
    Nov 27 '18 at 13:32











  • @Stuart Robbins I'm not sure what you're trying here since you seem to refer to "threads" or "processes" interchangeably. Are you aware you generally cannot pause threads, only whole processes?

    – Darkonaut
    Nov 27 '18 at 14:27
















3















Background: I process planetary imagery using a set of command-line utilities provided by the US Geologic Survey. Some of them are RAM hogs, to the extreme (10s of GB). USGS says it's just the way they run and don't have any plans to try to better manage the RAM. I built a Python wrapper to manipulate file lists to call the different steps to process the data in parts (such as all images taken in one color filter, and all taken in another, and all taken in another, etc.). Because things are done to multiple lists and multiple images, I thread it, using all the CPUs I can, to change stuff that might otherwise take two months to run to a week. I don't use native Python methods to thread, at the moment; instead I use GNU Parallel (and use os.system("") to call parallel and then the function) or I use Pysis, which is a Python way to call and multithread the USGS software.



Problem: As noted, some steps, for some files, take a huge amount of RAM, and there is no way of knowing ahead of time what those might be. So, I can get in a situation where for some files, each process was using 200 MB and runs fine on a 16GB RAM machine with 8 cores, but then it might start to process other files where I get RAM creep, using several GB, which with 8 processors on a 16GB RAM machine means RAM is compressed, swap space is used ... and that's if I'm lucky and the machine doesn't just lock up.



Solution? What I'm looking for is a way to monitor the RAM usage, say once a minute, by process name, and if I start to see RAM creep (e.g., 8 instances of a process each using over 2GB of RAM), I can pause all but one of those, let that one finish, un-pause another, let that finish, etc. until those 8 are done, then continue on with the rest of what might need to run for that step. Hopefully obviously, all this would be done in Python, not manually.



Is it possible to do that? If so, how?










share|improve this question

























  • stackoverflow.com/questions/276052/…

    – Vedant Kandoi
    Nov 27 '18 at 6:40











  • Been there, read that, and I already use psutil elsewhere in the code to make some decisions. But that doesn’t tell me how to monitor and pause processes based on usage by single processes, unless I’m missing something in the answers.

    – Stuart Robbins
    Nov 27 '18 at 6:42






  • 1





    What about stackoverflow.com/questions/938733/…

    – Vedant Kandoi
    Nov 27 '18 at 6:52











  • So, to use that, it looks like I'd need to put process=psutil.Process(os.getpid()).memory_info().rss into each thread component, but how would I get that to update every ~minute, and then have some super-monitor that pauses things when RAM usage gets too high?

    – Stuart Robbins
    Nov 27 '18 at 13:32











  • @Stuart Robbins I'm not sure what you're trying here since you seem to refer to "threads" or "processes" interchangeably. Are you aware you generally cannot pause threads, only whole processes?

    – Darkonaut
    Nov 27 '18 at 14:27














3












3








3








Background: I process planetary imagery using a set of command-line utilities provided by the US Geologic Survey. Some of them are RAM hogs, to the extreme (10s of GB). USGS says it's just the way they run and don't have any plans to try to better manage the RAM. I built a Python wrapper to manipulate file lists to call the different steps to process the data in parts (such as all images taken in one color filter, and all taken in another, and all taken in another, etc.). Because things are done to multiple lists and multiple images, I thread it, using all the CPUs I can, to change stuff that might otherwise take two months to run to a week. I don't use native Python methods to thread, at the moment; instead I use GNU Parallel (and use os.system("") to call parallel and then the function) or I use Pysis, which is a Python way to call and multithread the USGS software.



Problem: As noted, some steps, for some files, take a huge amount of RAM, and there is no way of knowing ahead of time what those might be. So, I can get in a situation where for some files, each process was using 200 MB and runs fine on a 16GB RAM machine with 8 cores, but then it might start to process other files where I get RAM creep, using several GB, which with 8 processors on a 16GB RAM machine means RAM is compressed, swap space is used ... and that's if I'm lucky and the machine doesn't just lock up.



Solution? What I'm looking for is a way to monitor the RAM usage, say once a minute, by process name, and if I start to see RAM creep (e.g., 8 instances of a process each using over 2GB of RAM), I can pause all but one of those, let that one finish, un-pause another, let that finish, etc. until those 8 are done, then continue on with the rest of what might need to run for that step. Hopefully obviously, all this would be done in Python, not manually.



Is it possible to do that? If so, how?










share|improve this question
















Background: I process planetary imagery using a set of command-line utilities provided by the US Geologic Survey. Some of them are RAM hogs, to the extreme (10s of GB). USGS says it's just the way they run and don't have any plans to try to better manage the RAM. I built a Python wrapper to manipulate file lists to call the different steps to process the data in parts (such as all images taken in one color filter, and all taken in another, and all taken in another, etc.). Because things are done to multiple lists and multiple images, I thread it, using all the CPUs I can, to change stuff that might otherwise take two months to run to a week. I don't use native Python methods to thread, at the moment; instead I use GNU Parallel (and use os.system("") to call parallel and then the function) or I use Pysis, which is a Python way to call and multithread the USGS software.



Problem: As noted, some steps, for some files, take a huge amount of RAM, and there is no way of knowing ahead of time what those might be. So, I can get in a situation where for some files, each process was using 200 MB and runs fine on a 16GB RAM machine with 8 cores, but then it might start to process other files where I get RAM creep, using several GB, which with 8 processors on a 16GB RAM machine means RAM is compressed, swap space is used ... and that's if I'm lucky and the machine doesn't just lock up.



Solution? What I'm looking for is a way to monitor the RAM usage, say once a minute, by process name, and if I start to see RAM creep (e.g., 8 instances of a process each using over 2GB of RAM), I can pause all but one of those, let that one finish, un-pause another, let that finish, etc. until those 8 are done, then continue on with the rest of what might need to run for that step. Hopefully obviously, all this would be done in Python, not manually.



Is it possible to do that? If so, how?







python multiprocessing python-multiprocessing ram monitor






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Feb 7 at 22:33









Darkonaut

4,0422924




4,0422924










asked Nov 27 '18 at 6:30









Stuart RobbinsStuart Robbins

1535




1535













  • stackoverflow.com/questions/276052/…

    – Vedant Kandoi
    Nov 27 '18 at 6:40











  • Been there, read that, and I already use psutil elsewhere in the code to make some decisions. But that doesn’t tell me how to monitor and pause processes based on usage by single processes, unless I’m missing something in the answers.

    – Stuart Robbins
    Nov 27 '18 at 6:42






  • 1





    What about stackoverflow.com/questions/938733/…

    – Vedant Kandoi
    Nov 27 '18 at 6:52











  • So, to use that, it looks like I'd need to put process=psutil.Process(os.getpid()).memory_info().rss into each thread component, but how would I get that to update every ~minute, and then have some super-monitor that pauses things when RAM usage gets too high?

    – Stuart Robbins
    Nov 27 '18 at 13:32











  • @Stuart Robbins I'm not sure what you're trying here since you seem to refer to "threads" or "processes" interchangeably. Are you aware you generally cannot pause threads, only whole processes?

    – Darkonaut
    Nov 27 '18 at 14:27



















  • stackoverflow.com/questions/276052/…

    – Vedant Kandoi
    Nov 27 '18 at 6:40











  • Been there, read that, and I already use psutil elsewhere in the code to make some decisions. But that doesn’t tell me how to monitor and pause processes based on usage by single processes, unless I’m missing something in the answers.

    – Stuart Robbins
    Nov 27 '18 at 6:42






  • 1





    What about stackoverflow.com/questions/938733/…

    – Vedant Kandoi
    Nov 27 '18 at 6:52











  • So, to use that, it looks like I'd need to put process=psutil.Process(os.getpid()).memory_info().rss into each thread component, but how would I get that to update every ~minute, and then have some super-monitor that pauses things when RAM usage gets too high?

    – Stuart Robbins
    Nov 27 '18 at 13:32











  • @Stuart Robbins I'm not sure what you're trying here since you seem to refer to "threads" or "processes" interchangeably. Are you aware you generally cannot pause threads, only whole processes?

    – Darkonaut
    Nov 27 '18 at 14:27

















stackoverflow.com/questions/276052/…

– Vedant Kandoi
Nov 27 '18 at 6:40





stackoverflow.com/questions/276052/…

– Vedant Kandoi
Nov 27 '18 at 6:40













Been there, read that, and I already use psutil elsewhere in the code to make some decisions. But that doesn’t tell me how to monitor and pause processes based on usage by single processes, unless I’m missing something in the answers.

– Stuart Robbins
Nov 27 '18 at 6:42





Been there, read that, and I already use psutil elsewhere in the code to make some decisions. But that doesn’t tell me how to monitor and pause processes based on usage by single processes, unless I’m missing something in the answers.

– Stuart Robbins
Nov 27 '18 at 6:42




1




1





What about stackoverflow.com/questions/938733/…

– Vedant Kandoi
Nov 27 '18 at 6:52





What about stackoverflow.com/questions/938733/…

– Vedant Kandoi
Nov 27 '18 at 6:52













So, to use that, it looks like I'd need to put process=psutil.Process(os.getpid()).memory_info().rss into each thread component, but how would I get that to update every ~minute, and then have some super-monitor that pauses things when RAM usage gets too high?

– Stuart Robbins
Nov 27 '18 at 13:32





So, to use that, it looks like I'd need to put process=psutil.Process(os.getpid()).memory_info().rss into each thread component, but how would I get that to update every ~minute, and then have some super-monitor that pauses things when RAM usage gets too high?

– Stuart Robbins
Nov 27 '18 at 13:32













@Stuart Robbins I'm not sure what you're trying here since you seem to refer to "threads" or "processes" interchangeably. Are you aware you generally cannot pause threads, only whole processes?

– Darkonaut
Nov 27 '18 at 14:27





@Stuart Robbins I'm not sure what you're trying here since you seem to refer to "threads" or "processes" interchangeably. Are you aware you generally cannot pause threads, only whole processes?

– Darkonaut
Nov 27 '18 at 14:27












2 Answers
2






active

oldest

votes


















2














You can use psutil.Process.suspend() to suspend execution of running processes which exceed a given memory threshold. The monitoring part is just repeatedly comparing psutil.Process().memory_info().rss ("Resident Set Size") of running processes with your given threshold. How you then schedule further processing is up to you.



In the example below I'm suspending culprit-processes until the rest is finished, then resume the once suspended processes one by one. This is meant to be a simplistic approach to show the general mechanism.



import time
import random
from threading import Thread
from multiprocessing import Process, active_children

import psutil


def format_mib(mem_bytes):
"""Format bytes into mebibyte-string."""
return f'{mem_bytes / 2 ** 20:.2f} MiB'


def f(append_length):
"""Main function in child-process. Appends random floats to list."""
p = psutil.Process()
li =
for i in range(10):
li.extend([random.random() for _ in range(append_length)])
print(f'i: {i} | pid: {p.pid} | '
f'{format_mib(p.memory_full_info().rss)}')
time.sleep(2)


def monitored(running_processes, max_mib):
"""Monitor memory usage for running processes.
Suspend execution for processes surpassing `max_mib` and complete
one by one after behaving processes have finished.
"""
running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
suspended_processes =

while running_processes:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return `False`
# for finished processes.
actual_processes = running_processes.copy()
for p in actual_processes:
if not p.is_running():
running_processes.remove(p)
print(f'removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > max_mib:
print(f'suspending process: {p}')
p.suspend()
running_processes.remove(p)
suspended_processes.append(p)

time.sleep(1)

for p in suspended_processes:
print(f'nresuming process: {p}')
p.resume()
p.wait()


if __name__ == '__main__':

MAX_MiB = 200

append_lengths = [100000, 500000, 1000000, 2000000, 300000]
processes = [Process(target=f, args=(append_length,))
for append_length in append_lengths]

for p in processes:
p.start()

m = Thread(target=monitored, args=(processes, MAX_MiB))
m.start()
m.join()


Example output (shortened) with two processes getting suspended for exceeding the 200 MiB threshold and resumed after the behaving processes have finished:



i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')

resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB

resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB


Process finished with exit code 0




EDIT:




I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?




I expanded the code above to enable starting new processes as old ones finish with a maximum for running processes set to the number of cores. I also refactored it into a class since it otherwise would start to get messy with all that necessary state to manage. In the code below I'm using the name "tasks" and "processes" interchangeably for brevity. Note the changed start method for processes and the accompanying comment in the code.



import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method

import psutil

# `def format_mib` and `def f` from above unchanged...

class TaskProcessor(Thread):
"""Processor class which monitors memory usage for running
tasks (processes). Suspends execution for tasks surpassing
`max_mib` and completes them one by one, after behaving
tasks have finished.
"""
def __init__(self, n_cores, max_mib, tasks):
super().__init__()
self.n_cores = n_cores
self.max_mib = max_mib # memory threshold
self.tasks = deque(tasks)

self._running_tasks =
self._suspended_tasks =

def run(self):
"""Main-function in new thread."""
self._update_running_tasks()
self._monitor_running_tasks()
self._process_suspended_tasks()

def _update_running_tasks(self):
"""Start new tasks if we have less running tasks than cores."""
while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
p = self.tasks.popleft()
p.start()
# for further process-management we here just need the
# psutil.Process wrapper
self._running_tasks.append(psutil.Process(pid=p.pid))
print(f'Started process: {self._running_tasks[-1]}')

def _monitor_running_tasks(self):
"""Monitor running tasks. Replace completed tasks and suspend tasks
which exceed the memory threshold `self.max_mib`.
"""
# loop while we have running or non-started tasks
while self._running_tasks or self.tasks:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return
# `False` for finished processes.
self._update_running_tasks()
actual_tasks = self._running_tasks.copy()

for p in actual_tasks:
if not p.is_running(): # process has finished
self._running_tasks.remove(p)
print(f'Removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > self.max_mib:
p.suspend()
self._running_tasks.remove(p)
self._suspended_tasks.append(p)
print(f'Suspended process: {p}')

time.sleep(1)

def _process_suspended_tasks(self):
"""Resume processing of suspended tasks."""
for p in self._suspended_tasks:
print(f'nResuming process: {p}')
p.resume()
p.wait()


if __name__ == '__main__':

# Forking (default on Unix-y systems) an already multithreaded process is
# error-prone. Since we intend to start processes after we are already
# multithreaded, we switch to another start-method.
set_start_method('spawn') # or 'forkserver' (a bit faster start up) if available

MAX_MiB = 200
N_CORES = 2

append_lengths = [100000, 500000, 1000000, 2000000, 300000]
tasks = [Process(target=f, args=(append_length,))
for append_length in append_lengths]

tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
tp.start()
tp.join()


Example Output (shortened):



Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')

Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB

Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB

Process finished with exit code 0





share|improve this answer


























  • This is almost exactly what I'm looking for. I spent some time working through it and adding a lot of comments so I can follow what you're doing. I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end? E.g., I have 8 cores, I have 20 things to run, I'll do 1 process per core. So I want to run 8 and whenever something completes, run the next, etc. Oh, one more issue: On macOS, how do I add in compressed RAM (CMPRS in top)?

    – Stuart Robbins
    Nov 27 '18 at 19:48











  • Okay, I was able to figure this out, though it's inelegant: In the monitored function, at the beginning, I put in a catch for if len(running_processes) is larger than the number of threads I want, then suspend all threads > than that, adding them to a new array, and then when threads finish check that list to restart them (copying your code for .append() and .remove(). If you have a more elegant method, let me know; otherwise, thanks! I'll wait a day or so before marking this as the answer in case you add to it.

    – Stuart Robbins
    Nov 27 '18 at 21:08











  • @ Stuart Robbins Will work out a modified version. I'm not sure if I can help you with CMPRS, though. I'm not using macOS, when you type cat /proc/self/status into terminal, will it give you also a field with cmprs?

    – Darkonaut
    Nov 27 '18 at 21:29











  • No, typing that into the terminal does nothing (no such file or directory). In BASH nor TCSH.

    – Stuart Robbins
    Nov 27 '18 at 23:16











  • @Stuart Robbins I updated my answer accordingly. Let me know if something remains unclear. Indeed macOS doesn't seem to have /proc like Linux (read here).

    – Darkonaut
    Nov 28 '18 at 0:06





















1














parallel --memfree is built for that situation:



parallel --memfree 1G doit ::: {1..100}


This will only spawn a new process if there is > 1 GB RAM free. If there is less than 0.5*1 GB free, it will kill the youngest and put that job back on the queue.



It was considered to only pause/suspend the youngest job, but experience showed that swapping that process out and in would often be much slower than to simply restarting the job.






share|improve this answer
























  • Hm, did not know about that option in parallel. While it's applicable to some of my code, it's not applicable to all of it since I can't use parallel for some aspects. Also, it's not just an issue of cores being free, it's an issue of, if one process on one core is taking up too much RAM, pause it to let the others continue work and finish before proceeding and cannibalizing their RAM.

    – Stuart Robbins
    Nov 30 '18 at 22:41











  • Except for the pause thing this is exactly what GNU Parallel does. GNU Parallel kills instead of swapping out.

    – Ole Tange
    Dec 1 '18 at 0:34













  • I don't want it killed because it still needs to be run, and I don't want progress to be lost.

    – Stuart Robbins
    Dec 1 '18 at 3:05











  • @StuartRobbins I had the same feeling, but experiments while developing GNU Parallel showed that it is usually better to lose the progress than to wait for the swapping to happen. Do you have numbers to back up that it is faster to swap-out+in than to kill+restart?

    – Ole Tange
    Dec 1 '18 at 10:48











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53493973%2fhow-to-pause-processes-in-case-they-are-consuming-too-much-memory%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























2 Answers
2






active

oldest

votes








2 Answers
2






active

oldest

votes









active

oldest

votes






active

oldest

votes









2














You can use psutil.Process.suspend() to suspend execution of running processes which exceed a given memory threshold. The monitoring part is just repeatedly comparing psutil.Process().memory_info().rss ("Resident Set Size") of running processes with your given threshold. How you then schedule further processing is up to you.



In the example below I'm suspending culprit-processes until the rest is finished, then resume the once suspended processes one by one. This is meant to be a simplistic approach to show the general mechanism.



import time
import random
from threading import Thread
from multiprocessing import Process, active_children

import psutil


def format_mib(mem_bytes):
"""Format bytes into mebibyte-string."""
return f'{mem_bytes / 2 ** 20:.2f} MiB'


def f(append_length):
"""Main function in child-process. Appends random floats to list."""
p = psutil.Process()
li =
for i in range(10):
li.extend([random.random() for _ in range(append_length)])
print(f'i: {i} | pid: {p.pid} | '
f'{format_mib(p.memory_full_info().rss)}')
time.sleep(2)


def monitored(running_processes, max_mib):
"""Monitor memory usage for running processes.
Suspend execution for processes surpassing `max_mib` and complete
one by one after behaving processes have finished.
"""
running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
suspended_processes =

while running_processes:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return `False`
# for finished processes.
actual_processes = running_processes.copy()
for p in actual_processes:
if not p.is_running():
running_processes.remove(p)
print(f'removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > max_mib:
print(f'suspending process: {p}')
p.suspend()
running_processes.remove(p)
suspended_processes.append(p)

time.sleep(1)

for p in suspended_processes:
print(f'nresuming process: {p}')
p.resume()
p.wait()


if __name__ == '__main__':

MAX_MiB = 200

append_lengths = [100000, 500000, 1000000, 2000000, 300000]
processes = [Process(target=f, args=(append_length,))
for append_length in append_lengths]

for p in processes:
p.start()

m = Thread(target=monitored, args=(processes, MAX_MiB))
m.start()
m.join()


Example output (shortened) with two processes getting suspended for exceeding the 200 MiB threshold and resumed after the behaving processes have finished:



i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')

resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB

resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB


Process finished with exit code 0




EDIT:




I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?




I expanded the code above to enable starting new processes as old ones finish with a maximum for running processes set to the number of cores. I also refactored it into a class since it otherwise would start to get messy with all that necessary state to manage. In the code below I'm using the name "tasks" and "processes" interchangeably for brevity. Note the changed start method for processes and the accompanying comment in the code.



import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method

import psutil

# `def format_mib` and `def f` from above unchanged...

class TaskProcessor(Thread):
"""Processor class which monitors memory usage for running
tasks (processes). Suspends execution for tasks surpassing
`max_mib` and completes them one by one, after behaving
tasks have finished.
"""
def __init__(self, n_cores, max_mib, tasks):
super().__init__()
self.n_cores = n_cores
self.max_mib = max_mib # memory threshold
self.tasks = deque(tasks)

self._running_tasks =
self._suspended_tasks =

def run(self):
"""Main-function in new thread."""
self._update_running_tasks()
self._monitor_running_tasks()
self._process_suspended_tasks()

def _update_running_tasks(self):
"""Start new tasks if we have less running tasks than cores."""
while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
p = self.tasks.popleft()
p.start()
# for further process-management we here just need the
# psutil.Process wrapper
self._running_tasks.append(psutil.Process(pid=p.pid))
print(f'Started process: {self._running_tasks[-1]}')

def _monitor_running_tasks(self):
"""Monitor running tasks. Replace completed tasks and suspend tasks
which exceed the memory threshold `self.max_mib`.
"""
# loop while we have running or non-started tasks
while self._running_tasks or self.tasks:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return
# `False` for finished processes.
self._update_running_tasks()
actual_tasks = self._running_tasks.copy()

for p in actual_tasks:
if not p.is_running(): # process has finished
self._running_tasks.remove(p)
print(f'Removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > self.max_mib:
p.suspend()
self._running_tasks.remove(p)
self._suspended_tasks.append(p)
print(f'Suspended process: {p}')

time.sleep(1)

def _process_suspended_tasks(self):
"""Resume processing of suspended tasks."""
for p in self._suspended_tasks:
print(f'nResuming process: {p}')
p.resume()
p.wait()


if __name__ == '__main__':

# Forking (default on Unix-y systems) an already multithreaded process is
# error-prone. Since we intend to start processes after we are already
# multithreaded, we switch to another start-method.
set_start_method('spawn') # or 'forkserver' (a bit faster start up) if available

MAX_MiB = 200
N_CORES = 2

append_lengths = [100000, 500000, 1000000, 2000000, 300000]
tasks = [Process(target=f, args=(append_length,))
for append_length in append_lengths]

tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
tp.start()
tp.join()


Example Output (shortened):



Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')

Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB

Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB

Process finished with exit code 0





share|improve this answer


























  • This is almost exactly what I'm looking for. I spent some time working through it and adding a lot of comments so I can follow what you're doing. I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end? E.g., I have 8 cores, I have 20 things to run, I'll do 1 process per core. So I want to run 8 and whenever something completes, run the next, etc. Oh, one more issue: On macOS, how do I add in compressed RAM (CMPRS in top)?

    – Stuart Robbins
    Nov 27 '18 at 19:48











  • Okay, I was able to figure this out, though it's inelegant: In the monitored function, at the beginning, I put in a catch for if len(running_processes) is larger than the number of threads I want, then suspend all threads > than that, adding them to a new array, and then when threads finish check that list to restart them (copying your code for .append() and .remove(). If you have a more elegant method, let me know; otherwise, thanks! I'll wait a day or so before marking this as the answer in case you add to it.

    – Stuart Robbins
    Nov 27 '18 at 21:08











  • @ Stuart Robbins Will work out a modified version. I'm not sure if I can help you with CMPRS, though. I'm not using macOS, when you type cat /proc/self/status into terminal, will it give you also a field with cmprs?

    – Darkonaut
    Nov 27 '18 at 21:29











  • No, typing that into the terminal does nothing (no such file or directory). In BASH nor TCSH.

    – Stuart Robbins
    Nov 27 '18 at 23:16











  • @Stuart Robbins I updated my answer accordingly. Let me know if something remains unclear. Indeed macOS doesn't seem to have /proc like Linux (read here).

    – Darkonaut
    Nov 28 '18 at 0:06


















2














You can use psutil.Process.suspend() to suspend execution of running processes which exceed a given memory threshold. The monitoring part is just repeatedly comparing psutil.Process().memory_info().rss ("Resident Set Size") of running processes with your given threshold. How you then schedule further processing is up to you.



In the example below I'm suspending culprit-processes until the rest is finished, then resume the once suspended processes one by one. This is meant to be a simplistic approach to show the general mechanism.



import time
import random
from threading import Thread
from multiprocessing import Process, active_children

import psutil


def format_mib(mem_bytes):
"""Format bytes into mebibyte-string."""
return f'{mem_bytes / 2 ** 20:.2f} MiB'


def f(append_length):
"""Main function in child-process. Appends random floats to list."""
p = psutil.Process()
li =
for i in range(10):
li.extend([random.random() for _ in range(append_length)])
print(f'i: {i} | pid: {p.pid} | '
f'{format_mib(p.memory_full_info().rss)}')
time.sleep(2)


def monitored(running_processes, max_mib):
"""Monitor memory usage for running processes.
Suspend execution for processes surpassing `max_mib` and complete
one by one after behaving processes have finished.
"""
running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
suspended_processes =

while running_processes:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return `False`
# for finished processes.
actual_processes = running_processes.copy()
for p in actual_processes:
if not p.is_running():
running_processes.remove(p)
print(f'removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > max_mib:
print(f'suspending process: {p}')
p.suspend()
running_processes.remove(p)
suspended_processes.append(p)

time.sleep(1)

for p in suspended_processes:
print(f'nresuming process: {p}')
p.resume()
p.wait()


if __name__ == '__main__':

MAX_MiB = 200

append_lengths = [100000, 500000, 1000000, 2000000, 300000]
processes = [Process(target=f, args=(append_length,))
for append_length in append_lengths]

for p in processes:
p.start()

m = Thread(target=monitored, args=(processes, MAX_MiB))
m.start()
m.join()


Example output (shortened) with two processes getting suspended for exceeding the 200 MiB threshold and resumed after the behaving processes have finished:



i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')

resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB

resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB


Process finished with exit code 0




EDIT:




I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?




I expanded the code above to enable starting new processes as old ones finish with a maximum for running processes set to the number of cores. I also refactored it into a class since it otherwise would start to get messy with all that necessary state to manage. In the code below I'm using the name "tasks" and "processes" interchangeably for brevity. Note the changed start method for processes and the accompanying comment in the code.



import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method

import psutil

# `def format_mib` and `def f` from above unchanged...

class TaskProcessor(Thread):
"""Processor class which monitors memory usage for running
tasks (processes). Suspends execution for tasks surpassing
`max_mib` and completes them one by one, after behaving
tasks have finished.
"""
def __init__(self, n_cores, max_mib, tasks):
super().__init__()
self.n_cores = n_cores
self.max_mib = max_mib # memory threshold
self.tasks = deque(tasks)

self._running_tasks =
self._suspended_tasks =

def run(self):
"""Main-function in new thread."""
self._update_running_tasks()
self._monitor_running_tasks()
self._process_suspended_tasks()

def _update_running_tasks(self):
"""Start new tasks if we have less running tasks than cores."""
while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
p = self.tasks.popleft()
p.start()
# for further process-management we here just need the
# psutil.Process wrapper
self._running_tasks.append(psutil.Process(pid=p.pid))
print(f'Started process: {self._running_tasks[-1]}')

def _monitor_running_tasks(self):
"""Monitor running tasks. Replace completed tasks and suspend tasks
which exceed the memory threshold `self.max_mib`.
"""
# loop while we have running or non-started tasks
while self._running_tasks or self.tasks:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return
# `False` for finished processes.
self._update_running_tasks()
actual_tasks = self._running_tasks.copy()

for p in actual_tasks:
if not p.is_running(): # process has finished
self._running_tasks.remove(p)
print(f'Removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > self.max_mib:
p.suspend()
self._running_tasks.remove(p)
self._suspended_tasks.append(p)
print(f'Suspended process: {p}')

time.sleep(1)

def _process_suspended_tasks(self):
"""Resume processing of suspended tasks."""
for p in self._suspended_tasks:
print(f'nResuming process: {p}')
p.resume()
p.wait()


if __name__ == '__main__':

# Forking (default on Unix-y systems) an already multithreaded process is
# error-prone. Since we intend to start processes after we are already
# multithreaded, we switch to another start-method.
set_start_method('spawn') # or 'forkserver' (a bit faster start up) if available

MAX_MiB = 200
N_CORES = 2

append_lengths = [100000, 500000, 1000000, 2000000, 300000]
tasks = [Process(target=f, args=(append_length,))
for append_length in append_lengths]

tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
tp.start()
tp.join()


Example Output (shortened):



Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')

Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB

Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB

Process finished with exit code 0





share|improve this answer


























  • This is almost exactly what I'm looking for. I spent some time working through it and adding a lot of comments so I can follow what you're doing. I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end? E.g., I have 8 cores, I have 20 things to run, I'll do 1 process per core. So I want to run 8 and whenever something completes, run the next, etc. Oh, one more issue: On macOS, how do I add in compressed RAM (CMPRS in top)?

    – Stuart Robbins
    Nov 27 '18 at 19:48











  • Okay, I was able to figure this out, though it's inelegant: In the monitored function, at the beginning, I put in a catch for if len(running_processes) is larger than the number of threads I want, then suspend all threads > than that, adding them to a new array, and then when threads finish check that list to restart them (copying your code for .append() and .remove(). If you have a more elegant method, let me know; otherwise, thanks! I'll wait a day or so before marking this as the answer in case you add to it.

    – Stuart Robbins
    Nov 27 '18 at 21:08











  • @ Stuart Robbins Will work out a modified version. I'm not sure if I can help you with CMPRS, though. I'm not using macOS, when you type cat /proc/self/status into terminal, will it give you also a field with cmprs?

    – Darkonaut
    Nov 27 '18 at 21:29











  • No, typing that into the terminal does nothing (no such file or directory). In BASH nor TCSH.

    – Stuart Robbins
    Nov 27 '18 at 23:16











  • @Stuart Robbins I updated my answer accordingly. Let me know if something remains unclear. Indeed macOS doesn't seem to have /proc like Linux (read here).

    – Darkonaut
    Nov 28 '18 at 0:06
















2












2








2







You can use psutil.Process.suspend() to suspend execution of running processes which exceed a given memory threshold. The monitoring part is just repeatedly comparing psutil.Process().memory_info().rss ("Resident Set Size") of running processes with your given threshold. How you then schedule further processing is up to you.



In the example below I'm suspending culprit-processes until the rest is finished, then resume the once suspended processes one by one. This is meant to be a simplistic approach to show the general mechanism.



import time
import random
from threading import Thread
from multiprocessing import Process, active_children

import psutil


def format_mib(mem_bytes):
"""Format bytes into mebibyte-string."""
return f'{mem_bytes / 2 ** 20:.2f} MiB'


def f(append_length):
"""Main function in child-process. Appends random floats to list."""
p = psutil.Process()
li =
for i in range(10):
li.extend([random.random() for _ in range(append_length)])
print(f'i: {i} | pid: {p.pid} | '
f'{format_mib(p.memory_full_info().rss)}')
time.sleep(2)


def monitored(running_processes, max_mib):
"""Monitor memory usage for running processes.
Suspend execution for processes surpassing `max_mib` and complete
one by one after behaving processes have finished.
"""
running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
suspended_processes =

while running_processes:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return `False`
# for finished processes.
actual_processes = running_processes.copy()
for p in actual_processes:
if not p.is_running():
running_processes.remove(p)
print(f'removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > max_mib:
print(f'suspending process: {p}')
p.suspend()
running_processes.remove(p)
suspended_processes.append(p)

time.sleep(1)

for p in suspended_processes:
print(f'nresuming process: {p}')
p.resume()
p.wait()


if __name__ == '__main__':

MAX_MiB = 200

append_lengths = [100000, 500000, 1000000, 2000000, 300000]
processes = [Process(target=f, args=(append_length,))
for append_length in append_lengths]

for p in processes:
p.start()

m = Thread(target=monitored, args=(processes, MAX_MiB))
m.start()
m.join()


Example output (shortened) with two processes getting suspended for exceeding the 200 MiB threshold and resumed after the behaving processes have finished:



i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')

resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB

resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB


Process finished with exit code 0




EDIT:




I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?




I expanded the code above to enable starting new processes as old ones finish with a maximum for running processes set to the number of cores. I also refactored it into a class since it otherwise would start to get messy with all that necessary state to manage. In the code below I'm using the name "tasks" and "processes" interchangeably for brevity. Note the changed start method for processes and the accompanying comment in the code.



import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method

import psutil

# `def format_mib` and `def f` from above unchanged...

class TaskProcessor(Thread):
"""Processor class which monitors memory usage for running
tasks (processes). Suspends execution for tasks surpassing
`max_mib` and completes them one by one, after behaving
tasks have finished.
"""
def __init__(self, n_cores, max_mib, tasks):
super().__init__()
self.n_cores = n_cores
self.max_mib = max_mib # memory threshold
self.tasks = deque(tasks)

self._running_tasks =
self._suspended_tasks =

def run(self):
"""Main-function in new thread."""
self._update_running_tasks()
self._monitor_running_tasks()
self._process_suspended_tasks()

def _update_running_tasks(self):
"""Start new tasks if we have less running tasks than cores."""
while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
p = self.tasks.popleft()
p.start()
# for further process-management we here just need the
# psutil.Process wrapper
self._running_tasks.append(psutil.Process(pid=p.pid))
print(f'Started process: {self._running_tasks[-1]}')

def _monitor_running_tasks(self):
"""Monitor running tasks. Replace completed tasks and suspend tasks
which exceed the memory threshold `self.max_mib`.
"""
# loop while we have running or non-started tasks
while self._running_tasks or self.tasks:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return
# `False` for finished processes.
self._update_running_tasks()
actual_tasks = self._running_tasks.copy()

for p in actual_tasks:
if not p.is_running(): # process has finished
self._running_tasks.remove(p)
print(f'Removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > self.max_mib:
p.suspend()
self._running_tasks.remove(p)
self._suspended_tasks.append(p)
print(f'Suspended process: {p}')

time.sleep(1)

def _process_suspended_tasks(self):
"""Resume processing of suspended tasks."""
for p in self._suspended_tasks:
print(f'nResuming process: {p}')
p.resume()
p.wait()


if __name__ == '__main__':

# Forking (default on Unix-y systems) an already multithreaded process is
# error-prone. Since we intend to start processes after we are already
# multithreaded, we switch to another start-method.
set_start_method('spawn') # or 'forkserver' (a bit faster start up) if available

MAX_MiB = 200
N_CORES = 2

append_lengths = [100000, 500000, 1000000, 2000000, 300000]
tasks = [Process(target=f, args=(append_length,))
for append_length in append_lengths]

tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
tp.start()
tp.join()


Example Output (shortened):



Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')

Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB

Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB

Process finished with exit code 0





share|improve this answer















You can use psutil.Process.suspend() to suspend execution of running processes which exceed a given memory threshold. The monitoring part is just repeatedly comparing psutil.Process().memory_info().rss ("Resident Set Size") of running processes with your given threshold. How you then schedule further processing is up to you.



In the example below I'm suspending culprit-processes until the rest is finished, then resume the once suspended processes one by one. This is meant to be a simplistic approach to show the general mechanism.



import time
import random
from threading import Thread
from multiprocessing import Process, active_children

import psutil


def format_mib(mem_bytes):
"""Format bytes into mebibyte-string."""
return f'{mem_bytes / 2 ** 20:.2f} MiB'


def f(append_length):
"""Main function in child-process. Appends random floats to list."""
p = psutil.Process()
li =
for i in range(10):
li.extend([random.random() for _ in range(append_length)])
print(f'i: {i} | pid: {p.pid} | '
f'{format_mib(p.memory_full_info().rss)}')
time.sleep(2)


def monitored(running_processes, max_mib):
"""Monitor memory usage for running processes.
Suspend execution for processes surpassing `max_mib` and complete
one by one after behaving processes have finished.
"""
running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
suspended_processes =

while running_processes:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return `False`
# for finished processes.
actual_processes = running_processes.copy()
for p in actual_processes:
if not p.is_running():
running_processes.remove(p)
print(f'removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > max_mib:
print(f'suspending process: {p}')
p.suspend()
running_processes.remove(p)
suspended_processes.append(p)

time.sleep(1)

for p in suspended_processes:
print(f'nresuming process: {p}')
p.resume()
p.wait()


if __name__ == '__main__':

MAX_MiB = 200

append_lengths = [100000, 500000, 1000000, 2000000, 300000]
processes = [Process(target=f, args=(append_length,))
for append_length in append_lengths]

for p in processes:
p.start()

m = Thread(target=monitored, args=(processes, MAX_MiB))
m.start()
m.join()


Example output (shortened) with two processes getting suspended for exceeding the 200 MiB threshold and resumed after the behaving processes have finished:



i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')

resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB

resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB


Process finished with exit code 0




EDIT:




I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?




I expanded the code above to enable starting new processes as old ones finish with a maximum for running processes set to the number of cores. I also refactored it into a class since it otherwise would start to get messy with all that necessary state to manage. In the code below I'm using the name "tasks" and "processes" interchangeably for brevity. Note the changed start method for processes and the accompanying comment in the code.



import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method

import psutil

# `def format_mib` and `def f` from above unchanged...

class TaskProcessor(Thread):
"""Processor class which monitors memory usage for running
tasks (processes). Suspends execution for tasks surpassing
`max_mib` and completes them one by one, after behaving
tasks have finished.
"""
def __init__(self, n_cores, max_mib, tasks):
super().__init__()
self.n_cores = n_cores
self.max_mib = max_mib # memory threshold
self.tasks = deque(tasks)

self._running_tasks =
self._suspended_tasks =

def run(self):
"""Main-function in new thread."""
self._update_running_tasks()
self._monitor_running_tasks()
self._process_suspended_tasks()

def _update_running_tasks(self):
"""Start new tasks if we have less running tasks than cores."""
while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
p = self.tasks.popleft()
p.start()
# for further process-management we here just need the
# psutil.Process wrapper
self._running_tasks.append(psutil.Process(pid=p.pid))
print(f'Started process: {self._running_tasks[-1]}')

def _monitor_running_tasks(self):
"""Monitor running tasks. Replace completed tasks and suspend tasks
which exceed the memory threshold `self.max_mib`.
"""
# loop while we have running or non-started tasks
while self._running_tasks or self.tasks:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return
# `False` for finished processes.
self._update_running_tasks()
actual_tasks = self._running_tasks.copy()

for p in actual_tasks:
if not p.is_running(): # process has finished
self._running_tasks.remove(p)
print(f'Removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > self.max_mib:
p.suspend()
self._running_tasks.remove(p)
self._suspended_tasks.append(p)
print(f'Suspended process: {p}')

time.sleep(1)

def _process_suspended_tasks(self):
"""Resume processing of suspended tasks."""
for p in self._suspended_tasks:
print(f'nResuming process: {p}')
p.resume()
p.wait()


if __name__ == '__main__':

# Forking (default on Unix-y systems) an already multithreaded process is
# error-prone. Since we intend to start processes after we are already
# multithreaded, we switch to another start-method.
set_start_method('spawn') # or 'forkserver' (a bit faster start up) if available

MAX_MiB = 200
N_CORES = 2

append_lengths = [100000, 500000, 1000000, 2000000, 300000]
tasks = [Process(target=f, args=(append_length,))
for append_length in append_lengths]

tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
tp.start()
tp.join()


Example Output (shortened):



Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')

Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB

Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB

Process finished with exit code 0






share|improve this answer














share|improve this answer



share|improve this answer








edited Nov 28 '18 at 12:58

























answered Nov 27 '18 at 17:51









DarkonautDarkonaut

4,0422924




4,0422924













  • This is almost exactly what I'm looking for. I spent some time working through it and adding a lot of comments so I can follow what you're doing. I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end? E.g., I have 8 cores, I have 20 things to run, I'll do 1 process per core. So I want to run 8 and whenever something completes, run the next, etc. Oh, one more issue: On macOS, how do I add in compressed RAM (CMPRS in top)?

    – Stuart Robbins
    Nov 27 '18 at 19:48











  • Okay, I was able to figure this out, though it's inelegant: In the monitored function, at the beginning, I put in a catch for if len(running_processes) is larger than the number of threads I want, then suspend all threads > than that, adding them to a new array, and then when threads finish check that list to restart them (copying your code for .append() and .remove(). If you have a more elegant method, let me know; otherwise, thanks! I'll wait a day or so before marking this as the answer in case you add to it.

    – Stuart Robbins
    Nov 27 '18 at 21:08











  • @ Stuart Robbins Will work out a modified version. I'm not sure if I can help you with CMPRS, though. I'm not using macOS, when you type cat /proc/self/status into terminal, will it give you also a field with cmprs?

    – Darkonaut
    Nov 27 '18 at 21:29











  • No, typing that into the terminal does nothing (no such file or directory). In BASH nor TCSH.

    – Stuart Robbins
    Nov 27 '18 at 23:16











  • @Stuart Robbins I updated my answer accordingly. Let me know if something remains unclear. Indeed macOS doesn't seem to have /proc like Linux (read here).

    – Darkonaut
    Nov 28 '18 at 0:06





















  • This is almost exactly what I'm looking for. I spent some time working through it and adding a lot of comments so I can follow what you're doing. I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end? E.g., I have 8 cores, I have 20 things to run, I'll do 1 process per core. So I want to run 8 and whenever something completes, run the next, etc. Oh, one more issue: On macOS, how do I add in compressed RAM (CMPRS in top)?

    – Stuart Robbins
    Nov 27 '18 at 19:48











  • Okay, I was able to figure this out, though it's inelegant: In the monitored function, at the beginning, I put in a catch for if len(running_processes) is larger than the number of threads I want, then suspend all threads > than that, adding them to a new array, and then when threads finish check that list to restart them (copying your code for .append() and .remove(). If you have a more elegant method, let me know; otherwise, thanks! I'll wait a day or so before marking this as the answer in case you add to it.

    – Stuart Robbins
    Nov 27 '18 at 21:08











  • @ Stuart Robbins Will work out a modified version. I'm not sure if I can help you with CMPRS, though. I'm not using macOS, when you type cat /proc/self/status into terminal, will it give you also a field with cmprs?

    – Darkonaut
    Nov 27 '18 at 21:29











  • No, typing that into the terminal does nothing (no such file or directory). In BASH nor TCSH.

    – Stuart Robbins
    Nov 27 '18 at 23:16











  • @Stuart Robbins I updated my answer accordingly. Let me know if something remains unclear. Indeed macOS doesn't seem to have /proc like Linux (read here).

    – Darkonaut
    Nov 28 '18 at 0:06



















This is almost exactly what I'm looking for. I spent some time working through it and adding a lot of comments so I can follow what you're doing. I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end? E.g., I have 8 cores, I have 20 things to run, I'll do 1 process per core. So I want to run 8 and whenever something completes, run the next, etc. Oh, one more issue: On macOS, how do I add in compressed RAM (CMPRS in top)?

– Stuart Robbins
Nov 27 '18 at 19:48





This is almost exactly what I'm looking for. I spent some time working through it and adding a lot of comments so I can follow what you're doing. I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end? E.g., I have 8 cores, I have 20 things to run, I'll do 1 process per core. So I want to run 8 and whenever something completes, run the next, etc. Oh, one more issue: On macOS, how do I add in compressed RAM (CMPRS in top)?

– Stuart Robbins
Nov 27 '18 at 19:48













Okay, I was able to figure this out, though it's inelegant: In the monitored function, at the beginning, I put in a catch for if len(running_processes) is larger than the number of threads I want, then suspend all threads > than that, adding them to a new array, and then when threads finish check that list to restart them (copying your code for .append() and .remove(). If you have a more elegant method, let me know; otherwise, thanks! I'll wait a day or so before marking this as the answer in case you add to it.

– Stuart Robbins
Nov 27 '18 at 21:08





Okay, I was able to figure this out, though it's inelegant: In the monitored function, at the beginning, I put in a catch for if len(running_processes) is larger than the number of threads I want, then suspend all threads > than that, adding them to a new array, and then when threads finish check that list to restart them (copying your code for .append() and .remove(). If you have a more elegant method, let me know; otherwise, thanks! I'll wait a day or so before marking this as the answer in case you add to it.

– Stuart Robbins
Nov 27 '18 at 21:08













@ Stuart Robbins Will work out a modified version. I'm not sure if I can help you with CMPRS, though. I'm not using macOS, when you type cat /proc/self/status into terminal, will it give you also a field with cmprs?

– Darkonaut
Nov 27 '18 at 21:29





@ Stuart Robbins Will work out a modified version. I'm not sure if I can help you with CMPRS, though. I'm not using macOS, when you type cat /proc/self/status into terminal, will it give you also a field with cmprs?

– Darkonaut
Nov 27 '18 at 21:29













No, typing that into the terminal does nothing (no such file or directory). In BASH nor TCSH.

– Stuart Robbins
Nov 27 '18 at 23:16





No, typing that into the terminal does nothing (no such file or directory). In BASH nor TCSH.

– Stuart Robbins
Nov 27 '18 at 23:16













@Stuart Robbins I updated my answer accordingly. Let me know if something remains unclear. Indeed macOS doesn't seem to have /proc like Linux (read here).

– Darkonaut
Nov 28 '18 at 0:06







@Stuart Robbins I updated my answer accordingly. Let me know if something remains unclear. Indeed macOS doesn't seem to have /proc like Linux (read here).

– Darkonaut
Nov 28 '18 at 0:06















1














parallel --memfree is built for that situation:



parallel --memfree 1G doit ::: {1..100}


This will only spawn a new process if there is > 1 GB RAM free. If there is less than 0.5*1 GB free, it will kill the youngest and put that job back on the queue.



It was considered to only pause/suspend the youngest job, but experience showed that swapping that process out and in would often be much slower than to simply restarting the job.






share|improve this answer
























  • Hm, did not know about that option in parallel. While it's applicable to some of my code, it's not applicable to all of it since I can't use parallel for some aspects. Also, it's not just an issue of cores being free, it's an issue of, if one process on one core is taking up too much RAM, pause it to let the others continue work and finish before proceeding and cannibalizing their RAM.

    – Stuart Robbins
    Nov 30 '18 at 22:41











  • Except for the pause thing this is exactly what GNU Parallel does. GNU Parallel kills instead of swapping out.

    – Ole Tange
    Dec 1 '18 at 0:34













  • I don't want it killed because it still needs to be run, and I don't want progress to be lost.

    – Stuart Robbins
    Dec 1 '18 at 3:05











  • @StuartRobbins I had the same feeling, but experiments while developing GNU Parallel showed that it is usually better to lose the progress than to wait for the swapping to happen. Do you have numbers to back up that it is faster to swap-out+in than to kill+restart?

    – Ole Tange
    Dec 1 '18 at 10:48
















1














parallel --memfree is built for that situation:



parallel --memfree 1G doit ::: {1..100}


This will only spawn a new process if there is > 1 GB RAM free. If there is less than 0.5*1 GB free, it will kill the youngest and put that job back on the queue.



It was considered to only pause/suspend the youngest job, but experience showed that swapping that process out and in would often be much slower than to simply restarting the job.






share|improve this answer
























  • Hm, did not know about that option in parallel. While it's applicable to some of my code, it's not applicable to all of it since I can't use parallel for some aspects. Also, it's not just an issue of cores being free, it's an issue of, if one process on one core is taking up too much RAM, pause it to let the others continue work and finish before proceeding and cannibalizing their RAM.

    – Stuart Robbins
    Nov 30 '18 at 22:41











  • Except for the pause thing this is exactly what GNU Parallel does. GNU Parallel kills instead of swapping out.

    – Ole Tange
    Dec 1 '18 at 0:34













  • I don't want it killed because it still needs to be run, and I don't want progress to be lost.

    – Stuart Robbins
    Dec 1 '18 at 3:05











  • @StuartRobbins I had the same feeling, but experiments while developing GNU Parallel showed that it is usually better to lose the progress than to wait for the swapping to happen. Do you have numbers to back up that it is faster to swap-out+in than to kill+restart?

    – Ole Tange
    Dec 1 '18 at 10:48














1












1








1







parallel --memfree is built for that situation:



parallel --memfree 1G doit ::: {1..100}


This will only spawn a new process if there is > 1 GB RAM free. If there is less than 0.5*1 GB free, it will kill the youngest and put that job back on the queue.



It was considered to only pause/suspend the youngest job, but experience showed that swapping that process out and in would often be much slower than to simply restarting the job.






share|improve this answer













parallel --memfree is built for that situation:



parallel --memfree 1G doit ::: {1..100}


This will only spawn a new process if there is > 1 GB RAM free. If there is less than 0.5*1 GB free, it will kill the youngest and put that job back on the queue.



It was considered to only pause/suspend the youngest job, but experience showed that swapping that process out and in would often be much slower than to simply restarting the job.







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 30 '18 at 6:01









Ole TangeOle Tange

19.5k35667




19.5k35667













  • Hm, did not know about that option in parallel. While it's applicable to some of my code, it's not applicable to all of it since I can't use parallel for some aspects. Also, it's not just an issue of cores being free, it's an issue of, if one process on one core is taking up too much RAM, pause it to let the others continue work and finish before proceeding and cannibalizing their RAM.

    – Stuart Robbins
    Nov 30 '18 at 22:41











  • Except for the pause thing this is exactly what GNU Parallel does. GNU Parallel kills instead of swapping out.

    – Ole Tange
    Dec 1 '18 at 0:34













  • I don't want it killed because it still needs to be run, and I don't want progress to be lost.

    – Stuart Robbins
    Dec 1 '18 at 3:05











  • @StuartRobbins I had the same feeling, but experiments while developing GNU Parallel showed that it is usually better to lose the progress than to wait for the swapping to happen. Do you have numbers to back up that it is faster to swap-out+in than to kill+restart?

    – Ole Tange
    Dec 1 '18 at 10:48



















  • Hm, did not know about that option in parallel. While it's applicable to some of my code, it's not applicable to all of it since I can't use parallel for some aspects. Also, it's not just an issue of cores being free, it's an issue of, if one process on one core is taking up too much RAM, pause it to let the others continue work and finish before proceeding and cannibalizing their RAM.

    – Stuart Robbins
    Nov 30 '18 at 22:41











  • Except for the pause thing this is exactly what GNU Parallel does. GNU Parallel kills instead of swapping out.

    – Ole Tange
    Dec 1 '18 at 0:34













  • I don't want it killed because it still needs to be run, and I don't want progress to be lost.

    – Stuart Robbins
    Dec 1 '18 at 3:05











  • @StuartRobbins I had the same feeling, but experiments while developing GNU Parallel showed that it is usually better to lose the progress than to wait for the swapping to happen. Do you have numbers to back up that it is faster to swap-out+in than to kill+restart?

    – Ole Tange
    Dec 1 '18 at 10:48

















Hm, did not know about that option in parallel. While it's applicable to some of my code, it's not applicable to all of it since I can't use parallel for some aspects. Also, it's not just an issue of cores being free, it's an issue of, if one process on one core is taking up too much RAM, pause it to let the others continue work and finish before proceeding and cannibalizing their RAM.

– Stuart Robbins
Nov 30 '18 at 22:41





Hm, did not know about that option in parallel. While it's applicable to some of my code, it's not applicable to all of it since I can't use parallel for some aspects. Also, it's not just an issue of cores being free, it's an issue of, if one process on one core is taking up too much RAM, pause it to let the others continue work and finish before proceeding and cannibalizing their RAM.

– Stuart Robbins
Nov 30 '18 at 22:41













Except for the pause thing this is exactly what GNU Parallel does. GNU Parallel kills instead of swapping out.

– Ole Tange
Dec 1 '18 at 0:34







Except for the pause thing this is exactly what GNU Parallel does. GNU Parallel kills instead of swapping out.

– Ole Tange
Dec 1 '18 at 0:34















I don't want it killed because it still needs to be run, and I don't want progress to be lost.

– Stuart Robbins
Dec 1 '18 at 3:05





I don't want it killed because it still needs to be run, and I don't want progress to be lost.

– Stuart Robbins
Dec 1 '18 at 3:05













@StuartRobbins I had the same feeling, but experiments while developing GNU Parallel showed that it is usually better to lose the progress than to wait for the swapping to happen. Do you have numbers to back up that it is faster to swap-out+in than to kill+restart?

– Ole Tange
Dec 1 '18 at 10:48





@StuartRobbins I had the same feeling, but experiments while developing GNU Parallel showed that it is usually better to lose the progress than to wait for the swapping to happen. Do you have numbers to back up that it is faster to swap-out+in than to kill+restart?

– Ole Tange
Dec 1 '18 at 10:48


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53493973%2fhow-to-pause-processes-in-case-they-are-consuming-too-much-memory%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

Calculate evaluation metrics using cross_val_predict sklearn

Insert data from modal to MySQL (multiple modal on website)