Skip to content

Commit d9bb983

Browse files
committed
Kind ofthe evaluating virtual memory for the Job is parameterized
1 parent 0ddcc8d commit d9bb983

File tree

2 files changed

+41
-29
lines changed

2 files changed

+41
-29
lines changed

README.md

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ _LIMIT_WORKERS_RAM = True
7070

7171
Job(name, workdir=None, args=(), timeout=0, ontimeout=False, task=None
7272
, startdelay=0, onstart=None, ondone=None, params=None, category=None, size=0
73-
, slowdown=1., omitafn=False, stdout=sys.stdout, stderr=sys.stderr):
73+
, slowdown=1., omitafn=False, vmemkind=1, stdout=sys.stdout, stderr=sys.stderr):
7474
"""Initialize job to be executed
7575
7676
Job is executed in a separate process via Popen or Process object and is
@@ -111,6 +111,11 @@ Job(name, workdir=None, args=(), timeout=0, ontimeout=False, task=None
111111
size - size of the processing data, >= 0; requires _LIMIT_WORKERS_RAM or _CHAINED_CONSTRAINTS
112112
0 means undefined size and prevents jobs chaining on constraints violation
113113
slowdown - execution slowdown ratio (inversely to the [estimated] execution speed), E (0, inf)
114+
vmemkind - kind of virtual memory to be evaluated:
115+
0 - vmem for the process itself omitting the spawned sub-processes (if any)
116+
1 - vmem for the heaviest process of the process tree spawned by the original process
117+
(including the origin itself)
118+
2 - vmem for the whole spawned process tree including the origin process
114119
115120
# Execution parameters, initialized automatically on execution
116121
tstart - start time is filled automatically on the execution start (before onstart). Default: None
@@ -319,18 +324,19 @@ In case the execution pool is required locally then it can be used in the follow
319324
# Limit of the virtual memory for the all worker processes with max(32 GB, RAM)
320325
# and provide latency of 1.5 sec for the jobs rescheduling
321326
with ExecPool(max(cpu_count()-1, 1), vmlimit=32, latency=1.5) as xpool:
322-
job = Job('jvmem_proc', args=(PYEXEC, '-c', TestProcMemTree.allocAndSpawnProg(
327+
job = Job('jvmem_proc', args=(PYEXEC, '-c', TestProcMemTree.allocAndSpawnProg(
323328
allocDelayProg(inBytes(amem), duration), allocDelayProg(inBytes(camem), duration)))
324-
, timeout=timeout, ondone=mock.MagicMock())
325-
jobtr = Job('jvmem_tree', args=(PYEXEC, '-c', TestProcMemTree.allocAndSpawnProg(
329+
, timeout=timeout, vmemkind=0, ondone=mock.MagicMock())
330+
jobx = Job('jvmem_max-subproc', args=(PYEXEC, '-c', TestProcMemTree.allocAndSpawnProg(
326331
allocDelayProg(inBytes(amem), duration), allocDelayProg(inBytes(camem), duration)))
327-
, timeout=timeout, ondone=mock.MagicMock())
332+
, timeout=timeout, vmemkind=1, ondone=mock.MagicMock())
328333
...
329334
xpool.execute(job)
330-
xpool.execute(jobtr)
335+
xpool.execute(jobx)
331336
...
332337
xpool.join(10) # Timeout for the execution of all jobs is 10 sec [+latency]
333338
```
339+
The code shown above is fetched from the `TestProcMemTree` unit test available in the end of the [source file](mpepool.py).
334340

335341
### Failsafe Termination
336342
To perform *graceful termination* of the Jobs in case of external termination of your program, signal handlers can be set:

mpepool.py

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ class Job(object):
219219
# NOTE: keyword-only arguments are specified after the *, supported only since Python 3
220220
def __init__(self, name, workdir=None, args=(), timeout=0, ontimeout=False, task=None #,*
221221
, startdelay=0, onstart=None, ondone=None, params=None, category=None, size=0, slowdown=1.
222-
, omitafn=False, stdout=sys.stdout, stderr=sys.stderr):
222+
, omitafn=False, vmemkind=1, stdout=sys.stdout, stderr=sys.stderr):
223223
"""Initialize job to be executed
224224
225225
# Main parameters
@@ -257,6 +257,11 @@ def __init__(self, name, workdir=None, args=(), timeout=0, ontimeout=False, task
257257
size - size of the processing data, >= 0; requires _LIMIT_WORKERS_RAM or _CHAINED_CONSTRAINTS
258258
0 means undefined size and prevents jobs chaining on constraints violation
259259
slowdown - execution slowdown ratio (inversely to the [estimated] execution speed), E (0, inf)
260+
vmemkind - kind of virtual memory to be evaluated:
261+
0 - vmem for the process itself omitting the spawned sub-processes (if any)
262+
1 - vmem for the heaviest process of the process tree spawned by the original process
263+
(including the origin itself)
264+
2 - vmem for the whole spawned process tree including the origin process
260265
261266
# Execution parameters, initialized automatically on execution
262267
tstart - start time is filled automatically on the execution start (before onstart). Default: None
@@ -267,7 +272,7 @@ def __init__(self, name, workdir=None, args=(), timeout=0, ontimeout=False, task
267272
inherited from the jobs of the same category having non-smaller size; requires _LIMIT_WORKERS_RAM
268273
"""
269274
assert isinstance(name, str) and timeout >= 0 and (task is None or isinstance(task, Task)
270-
) and size >= 0 and slowdown > 0, 'Parameters validaiton failed'
275+
) and size >= 0 and slowdown > 0 and 0 <= vmemkind <= 2, 'Parameters validaiton failed'
271276
#if not args:
272277
# args = ("false") # Create an empty process to schedule it's execution
273278

@@ -298,6 +303,7 @@ def __init__(self, name, workdir=None, args=(), timeout=0, ontimeout=False, task
298303
self._fstderr = None
299304
# Omit scheduler affinity policy (actual when some process is computed on all treads, etc.)
300305
self._omitafn = omitafn
306+
self.vmemkind = vmemkind
301307
if _LIMIT_WORKERS_RAM or _CHAINED_CONSTRAINTS:
302308
self.size = size # Size of the processing data
303309
# Consumed VM on execution in gigabytes or the least expected (inherited from the
@@ -310,20 +316,21 @@ def __init__(self, name, workdir=None, args=(), timeout=0, ontimeout=False, task
310316
self.wkslim = None # Worker processes limit (max number) on the job postponing if any
311317

312318

313-
def _updateVmem(self, tree=False, heaviest=False):
319+
def _updateVmem(self):
314320
"""Update virtual memory consumption using smooth max
315321
316322
Actual virtual memory (not the historical max) is retrieved and updated
317323
using:
318324
a) smoothing filter in case of the decreasing consumption and
319-
b) direct update in case of the increasing consumption
325+
b) direct update in case of the increasing consumption.
320326
321327
Prerequisites: job must have defined proc and psutil should be available
322328
323-
tree - evaluate virtual memory consumption for the whole process tree
324-
spawned by the current process
325-
maxchild - evaluate virtual memory consumption only for the heaviest in
326-
the process tree. Actual only for the enabled 'tree' option.
329+
self.vmemkind defines the kind of virtual memory to be evaluated:
330+
0 - vmem for the process itself omitting the spawned sub-processes (if any)
331+
1 - vmem for the heaviest process of the process tree spawned by the original process
332+
(including the origin)
333+
2 - vmem for the whole spawned process tree including the origin process
327334
328335
return - smooth max of job vmem
329336
"""
@@ -332,16 +339,15 @@ def _updateVmem(self, tree=False, heaviest=False):
332339
try:
333340
up = psutil.Process(self.proc.pid)
334341
curvmem = up.memory_info().vms
335-
if tree:
342+
if self.vmemkind:
336343
avmem = curvmem # Memory consumption of the whole process tree
337-
if heaviest:
338-
xvmem = curvmem # Memory consumption of the heaviest process in the tree
344+
xvmem = curvmem # Memory consumption of the heaviest process in the tree
339345
for ucp in up.children(recursive=True): # Note: fetches only children procs
340346
vmem = ucp.memory_info().vms # Mb; Resident Set Size
341347
avmem += vmem
342-
if heaviest and xvmem < vmem:
348+
if xvmem < vmem:
343349
xvmem = vmem
344-
curvmem = avmem if not heaviest else xvmem
350+
curvmem = avmem if self.vmemkind == 2 else xvmem
345351
# Check also
346352
except psutil.Error as err:
347353
# The process is finished and such pid does not exist
@@ -873,7 +879,7 @@ def __reviseWorkers(self):
873879
# NOTE: Evaluate memory consuption for the heaviest process in the process tree
874880
# of the origin job process to allow additional intermediate apps for the evaluations like:
875881
# ./exectime ./clsalg ca_prm1 ca_prm2
876-
job._updateVmem(True, True) # Consider vm consumption of past runs if any
882+
job._updateVmem() # Consider vm consumption of the past runs if any
877883
if job.vmem < self._vmlimit:
878884
vmtotal += job.vmem # Consider vm consumption of past runs if any
879885
#if _DEBUG_TRACE == 3:
@@ -1692,27 +1698,27 @@ def test_jobVmem(self):
16921698
amem = 0.02 # Direct allocating memory in the process
16931699
camem = 0.07 # Allocatinf memory in the child process
16941700
duration = worktime / 3 # Duration in sec
1695-
job = Job('jvmem_proc', args=(PYEXEC, '-c', TestProcMemTree.allocAndSpawnProg(
1701+
job = Job('jvmem_proc', args=(PYEXEC, '-c', TestProcMemTree.allocAndSpawnProg(
16961702
allocDelayProg(inBytes(amem), duration), allocDelayProg(inBytes(camem), duration)))
1697-
, timeout=timeout, ondone=mock.MagicMock())
1698-
jobtr = Job('jvmem_tree', args=(PYEXEC, '-c', TestProcMemTree.allocAndSpawnProg(
1703+
, timeout=timeout, vmemkind=0, ondone=mock.MagicMock())
1704+
jobx = Job('jvmem_max-subproc', args=(PYEXEC, '-c', TestProcMemTree.allocAndSpawnProg(
16991705
allocDelayProg(inBytes(amem), duration), allocDelayProg(inBytes(camem), duration)))
1700-
, timeout=timeout, ondone=mock.MagicMock())
1701-
jobx = Job('jvmem_max-subproc', args=(PYEXEC, '-c', TestProcMemTree.allocAndSpawnProg(
1706+
, timeout=timeout, vmemkind=1, ondone=mock.MagicMock())
1707+
jobtr = Job('jvmem_tree', args=(PYEXEC, '-c', TestProcMemTree.allocAndSpawnProg(
17021708
allocDelayProg(inBytes(amem), duration), allocDelayProg(inBytes(camem), duration)))
1703-
, timeout=timeout, ondone=mock.MagicMock())
1709+
, timeout=timeout, vmemkind=2, ondone=mock.MagicMock())
17041710

17051711
# Verify that non-started job raises exception on memory update request
17061712
self.assertRaises(AttributeError, job._updateVmem)
17071713

17081714
tstart = time.time()
17091715
xpool.execute(job)
1710-
xpool.execute(jobtr)
17111716
xpool.execute(jobx)
1717+
xpool.execute(jobtr)
17121718
time.sleep(duration*2)
17131719
pvmem = job._updateVmem()
1714-
tvmem = jobtr._updateVmem(True)
1715-
xvmem = jobx._updateVmem(True, True)
1720+
xvmem = jobx._updateVmem()
1721+
tvmem = jobtr._updateVmem()
17161722
# Verify memory consumption
17171723
self.assertTrue(pvmem < xvmem < tvmem)
17181724
print('Memory consumption in Mb, proc_vmem: {pvmem:.3g}, max_procInTree_vmem: {xvmem:.3g}, procTree_vmem: {tvmem:.3g}'

0 commit comments

Comments
 (0)