-
Notifications
You must be signed in to change notification settings - Fork 3
Description
Hi,
I wanted to document an unexpected behavior using the zcollection.Collection.map() function. I work in a distributed environment and relies on this function to do part of my computation. Some tasks are memory-intensive so my goal is to have one task per worker in parallel. However, I found that using delayed=True might trigger multiple tasks in a worker, even if it is mono-process and mono-thread.
Below is a simple configuration for trying to reproduce the problem. I set up a SLURM cluster with only one worker (one process, one thread) and with a minimal zcollection containing only timestamps. Then, I map a simple callback that reads the dataset and sleeps a short time before returning nothing.
Setup code
import dask
import dask.distributed
import dask_jobqueue
import fsspec
import numpy as np
import time
import os
import zcollection as zc
import zcollection.tests.data as zc_dat
import logging
logger = logging.getLogger('mycode')
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s')
logger.setLevel('INFO')
cluster = cluster = dask_jobqueue.SLURMCluster(
cores=1,
memory="2GiB",
processes=1,
log_directory=f"/work/scratch/data/{os.environ['USER']}/dask_stuff",
walltime="00:30:00",
interface="ib0",
account="swotce",
job_extra_directives=["--export=None"], # Do not propagate environment
)
client = cluster.get_client()
client
# Set up logging
client.forward_logging(logger_name='mycode', level='INFO')
cluster.scale(jobs=1)
client.wait_for_workers(1)
client.run(logging.basicConfig, level='INFO')
# Set up collection
partition_handler = zc.partitioning.Date(('time', ), resolution='D')
zds = zc.Dataset([zc.Array('time', np.arange('2025-02-01', '2025-03-01', dtype='M8[h]'), ('num_lines',))])
collection = zc.create_collection('time', zds, partition_handler, f'/work/scratch/data/{os.environ["USER"]}/my_collection')
collection.insert(zds)
# Simple callback
def callback(zds):
logger.info('Processing %s...', zds['time'].values[0])
t = zds['time'].compute()
time.sleep(1)
logger.info('Processing %s... Done', zds['time'].values[0])Using delayed=False, I get the expected behavior: the calls are not mixed up. Only once we are done with a task, the next one is computed
collection.map(callback, delayed=False).compute()
>> 2025-02-14 10:53:09,951 INFO Processing 2025-02-10T00...
>> 2025-02-14 10:53:10,951 INFO Processing 2025-02-10T00... Done
>> 2025-02-14 10:53:10,953 INFO Processing 2025-02-09T00...
>> 2025-02-14 10:53:11,953 INFO Processing 2025-02-09T00... Done
>> 2025-02-14 10:53:11,959 INFO Processing 2025-02-08T00...
>> 2025-02-14 10:53:12,960 INFO Processing 2025-02-08T00... Done
>> 2025-02-14 10:53:12,961 INFO Processing 2025-02-07T00...
>> 2025-02-14 10:53:13,961 INFO Processing 2025-02-07T00... Done
>> 2025-02-14 10:53:13,968 INFO Processing 2025-02-06T00...
>> 2025-02-14 10:53:14,968 INFO Processing 2025-02-06T00... Done
>> 2025-02-14 10:53:14,969 INFO Processing 2025-02-05T00...
>> 2025-02-14 10:53:15,969 INFO Processing 2025-02-05T00... Done
>> 2025-02-14 10:53:15,976 INFO Processing 2025-02-04T00...
>> 2025-02-14 10:53:16,976 INFO Processing 2025-02-04T00... Done
>> 2025-02-14 10:53:16,977 INFO Processing 2025-02-28T00...
>> 2025-02-14 10:53:17,977 INFO Processing 2025-02-28T00... Done
>> 2025-02-14 10:53:17,984 INFO Processing 2025-02-27T00...
>> 2025-02-14 10:53:18,984 INFO Processing 2025-02-27T00... Done
>> 2025-02-14 10:53:18,985 INFO Processing 2025-02-26T00...
>> 2025-02-14 10:53:19,985 INFO Processing 2025-02-26T00... Done
>> 2025-02-14 10:53:19,991 INFO Processing 2025-02-25T00...
>> 2025-02-14 10:53:20,991 INFO Processing 2025-02-25T00... Done
>> 2025-02-14 10:53:20,993 INFO Processing 2025-02-24T00...
>> 2025-02-14 10:53:21,993 INFO Processing 2025-02-24T00... Done
>> 2025-02-14 10:53:22,003 INFO Processing 2025-02-23T00...
>> 2025-02-14 10:53:23,003 INFO Processing 2025-02-23T00... Done
>> 2025-02-14 10:53:23,005 INFO Processing 2025-02-22T00...
>> 2025-02-14 10:53:24,005 INFO Processing 2025-02-22T00... Done
>> 2025-02-14 10:53:24,012 INFO Processing 2025-02-21T00...
>> 2025-02-14 10:53:25,012 INFO Processing 2025-02-21T00... Done
>> 2025-02-14 10:53:25,014 INFO Processing 2025-02-03T00...
>> 2025-02-14 10:53:26,014 INFO Processing 2025-02-03T00... Done
>> 2025-02-14 10:53:26,022 INFO Processing 2025-02-20T00...
>> 2025-02-14 10:53:27,022 INFO Processing 2025-02-20T00... Done
>> 2025-02-14 10:53:27,024 INFO Processing 2025-02-19T00...
>> 2025-02-14 10:53:28,024 INFO Processing 2025-02-19T00... Done
>> 2025-02-14 10:53:28,030 INFO Processing 2025-02-18T00...
>> 2025-02-14 10:53:29,030 INFO Processing 2025-02-18T00... Done
>> 2025-02-14 10:53:29,031 INFO Processing 2025-02-17T00...
>> 2025-02-14 10:53:30,032 INFO Processing 2025-02-17T00... Done
>> 2025-02-14 10:53:30,038 INFO Processing 2025-02-16T00...
>> 2025-02-14 10:53:31,038 INFO Processing 2025-02-16T00... Done
>> 2025-02-14 10:53:31,039 INFO Processing 2025-02-15T00...
>> 2025-02-14 10:53:32,039 INFO Processing 2025-02-15T00... Done
>> 2025-02-14 10:53:32,047 INFO Processing 2025-02-14T00...
>> 2025-02-14 10:53:33,047 INFO Processing 2025-02-14T00... Done
>> 2025-02-14 10:53:33,049 INFO Processing 2025-02-13T00...
>> 2025-02-14 10:53:34,049 INFO Processing 2025-02-13T00... Done
>> 2025-02-14 10:53:34,056 INFO Processing 2025-02-12T00...
>> 2025-02-14 10:53:35,056 INFO Processing 2025-02-12T00... Done
>> 2025-02-14 10:53:35,058 INFO Processing 2025-02-11T00...
>> 2025-02-14 10:53:36,058 INFO Processing 2025-02-11T00... Done
>> 2025-02-14 10:53:36,064 INFO Processing 2025-02-02T00...
>> 2025-02-14 10:53:37,064 INFO Processing 2025-02-02T00... Done
>> 2025-02-14 10:53:37,066 INFO Processing 2025-02-01T00...
>> 2025-02-14 10:53:38,066 INFO Processing 2025-02-01T00... DoneUsing delayed=True, I get logging messages indicating that the tasks are launched in parallel on the worker. This is in line with what I observed with a more complex case: more than one of my memory intensive tasks are launched by a worker, ultimately filling up the RAM and causing it to be killed. I may have a problematic cluster configuration but I don't think this is a desirable behavior in general, as one might not expect to have such a difference in scheduling with a change in the 'delayed' parameter.
The only noticeable difference I can see is that the callback receives delayed arrays and must compute the graph on a worker. So I think we might have stumbled in an issue related to dask-in-workers
collection.map(callback, delayed=True).compute()
>> 2025-02-14 10:57:21,696 INFO Processing 2025-02-10T00...
>> 2025-02-14 10:57:21,697 INFO Processing 2025-02-09T00...
>> 2025-02-14 10:57:21,704 INFO Processing 2025-02-08T00...
>> 2025-02-14 10:57:23,726 INFO Processing 2025-02-10T00... Done
>> 2025-02-14 10:57:23,726 INFO Processing 2025-02-06T00...
>> 2025-02-14 10:57:23,732 INFO Processing 2025-02-05T00...
>> 2025-02-14 10:57:23,734 INFO Processing 2025-02-09T00... Done
>> 2025-02-14 10:57:23,735 INFO Processing 2025-02-07T00...
>> 2025-02-14 10:57:25,744 INFO Processing 2025-02-08T00... Done
>> 2025-02-14 10:57:26,746 INFO Processing 2025-02-07T00... Done
>> 2025-02-14 10:57:27,753 INFO Processing 2025-02-04T00...
>> 2025-02-14 10:57:27,754 INFO Processing 2025-02-06T00... Done
>> 2025-02-14 10:57:27,755 INFO Processing 2025-02-05T00... Done
>> 2025-02-14 10:57:27,770 INFO Processing 2025-02-28T00...
>> 2025-02-14 10:57:28,778 INFO Processing 2025-02-27T00...
>> 2025-02-14 10:57:28,783 INFO Processing 2025-02-26T00...
>> 2025-02-14 10:57:29,786 INFO Processing 2025-02-04T00... Done
>> 2025-02-14 10:57:29,787 INFO Processing 2025-02-25T00...
>> 2025-02-14 10:57:31,800 INFO Processing 2025-02-28T00... Done
>> 2025-02-14 10:57:31,806 INFO Processing 2025-02-26T00... Done
>> 2025-02-14 10:57:31,809 INFO Processing 2025-02-27T00... Done
>> 2025-02-14 10:57:31,810 INFO Processing 2025-02-24T00...
>> 2025-02-14 10:57:32,813 INFO Processing 2025-02-23T00...
>> 2025-02-14 10:57:32,820 INFO Processing 2025-02-22T00...
>> 2025-02-14 10:57:33,822 INFO Processing 2025-02-25T00... Done
>> 2025-02-14 10:57:35,832 INFO Processing 2025-02-24T00... Done
>> 2025-02-14 10:57:35,837 INFO Processing 2025-02-22T00... Done
>> 2025-02-14 10:57:35,838 INFO Processing 2025-02-21T00...
>> 2025-02-14 10:57:35,839 INFO Processing 2025-02-23T00... Done
>> 2025-02-14 10:57:36,848 INFO Processing 2025-02-20T00...
>> 2025-02-14 10:57:36,853 INFO Processing 2025-02-21T00... Done
>> 2025-02-14 10:57:36,853 INFO Processing 2025-02-03T00...
>> 2025-02-14 10:57:38,880 INFO Processing 2025-02-18T00...
>> 2025-02-14 10:57:38,881 INFO Processing 2025-02-20T00... Done
>> 2025-02-14 10:57:38,882 INFO Processing 2025-02-03T00... Done
>> 2025-02-14 10:57:38,896 INFO Processing 2025-02-19T00...
>> 2025-02-14 10:57:38,900 INFO Processing 2025-02-17T00...
>> 2025-02-14 10:57:39,912 INFO Processing 2025-02-16T00...
>> 2025-02-14 10:57:40,920 INFO Processing 2025-02-15T00...
>> 2025-02-14 10:57:40,921 INFO Processing 2025-02-18T00... Done
>> 2025-02-14 10:57:41,931 INFO Processing 2025-02-17T00... Done
>> 2025-02-14 10:57:43,935 INFO Processing 2025-02-19T00... Done
>> 2025-02-14 10:57:43,940 INFO Processing 2025-02-15T00... Done
>> 2025-02-14 10:57:43,942 INFO Processing 2025-02-16T00... Done
>> 2025-02-14 10:57:43,942 INFO Processing 2025-02-14T00...
>> 2025-02-14 10:57:43,943 INFO Processing 2025-02-13T00...
>> 2025-02-14 10:57:44,955 INFO Processing 2025-02-12T00...
>> 2025-02-14 10:57:44,956 INFO Processing 2025-02-14T00... Done
>> 2025-02-14 10:57:46,969 INFO Processing 2025-02-11T00...
>> 2025-02-14 10:57:46,969 INFO Processing 2025-02-02T00...
>> 2025-02-14 10:57:46,970 INFO Processing 2025-02-13T00... Done
>> 2025-02-14 10:57:46,979 INFO Processing 2025-02-12T00... Done
>> 2025-02-14 10:57:46,979 INFO Processing 2025-02-01T00...
>> 2025-02-14 10:57:48,986 INFO Processing 2025-02-01T00... Done
>> 2025-02-14 10:57:49,991 INFO Processing 2025-02-02T00... Done
>> 2025-02-14 10:57:49,996 INFO Processing 2025-02-11T00... Done