Skip to content

Commit 02f670c

Browse files
committed
draft
1 parent 345081d commit 02f670c

File tree

9 files changed

+337
-21
lines changed

9 files changed

+337
-21
lines changed

pyproject.toml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
66

77
# Metadata (see https://peps.python.org/pep-0621/)
88
[project]
9-
name = "cloudcasting-app"
9+
name = "cloudcasting-mlops"
1010
dynamic = ["version"] # Set automtically using git: https://setuptools-git-versioning.readthedocs.io/en/stable/
1111
description = "An app for running the OCF cloudcasting model in production"
1212
readme = {file = "README.md", content-type = "text/markdown"}
@@ -20,14 +20,15 @@ classifiers = [
2020
"License :: OSI Approved :: MIT License",
2121
]
2222
dependencies = [
23-
"fsspec==2025.7.0",
23+
"fsspec==2025.9.0",
2424
"huggingface-hub==0.28.1",
2525
"hydra-core==1.3.2",
26+
"icechunk==1.1.5",
2627
"loguru == 0.7.3",
2728
"numpy==2.1.2",
2829
"ocf-data-sampler==0.5.27",
2930
"pandas==2.2.3",
30-
"s3fs==2025.7.0",
31+
"s3fs==2025.9.0",
3132
"safetensors==0.5.2",
3233
"sat_pred @ git+https://github.com/openclimatefix/sat_pred.git@main",
3334
# Since torch distributes CPU only packages as wheels, have to specify the target platform in order to pull the wheel compiled for that specific platform
@@ -57,6 +58,9 @@ cloudcasting-app = "cloudcasting_app.app:main"
5758
[project.urls]
5859
repository = "https://github.com/openclimatefix/cloudcasting-app"
5960

61+
[tool.setuptools.packages.find]
62+
where = ["src"]
63+
6064
[tool.setuptools]
6165
include-package-data = false
6266

@@ -125,8 +129,8 @@ docstring-code-line-length = 100
125129
# --- DOCUMENTATION CONFIGURATION --- #
126130

127131
[tool.pydoctor]
128-
add-package = ["src/cloudcasting_app"]
129-
project-base-dir = "src/cloudcasting_app"
132+
add-package = ["cloudcasting_app", "cloudcasting_metrics"]
133+
project-base-dir = "src"
130134
docformat = "google"
131135
html-output = "docs"
132136
theme = "classic"
File renamed without changes.

src/cloudcasting_metrics/__init__.py

Whitespace-only changes.

src/cloudcasting_metrics/app.py

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
"""Runs metric calculations on cloudcasting for a given input day and appends to zarr store
2+
3+
This app expects these environmental variables to be available:
4+
- SATELLITE_ICECHUNK_ARCHIVE (str): Path at which ground truth satellite data can be found
5+
- CLOUDCASTING_PREDICTION_DIRECTORY (str): The directory where the cloudcasting forecasts are
6+
saved
7+
- METRIC_ZARR_PATH (str): The path where the metric values will be saved
8+
9+
If the SATELLITE_ICECHUNK_ARCHIVE is an s3 path, then the environment variables
10+
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION must also be set.
11+
"""
12+
13+
import os
14+
import re
15+
import fsspec
16+
import numpy as np
17+
import pandas as pd
18+
from tqdm import tqdm
19+
20+
import xarray as xr
21+
import icechunk
22+
from loguru import logger
23+
24+
# ---------------------------------------------------------------------------
25+
26+
# The forecast produces these horizon steps
27+
FORECAST_STEPS = pd.timedelta_range(start="15min", end="180min", freq="15min")
28+
# The forecast is run at this frequency
29+
FORECAST_FREQ = pd.Timedelta("30min")
30+
31+
32+
def open_icechunk(path: str) -> xr.Dataset:
33+
"""Open an icechunk store to xarray Dataset
34+
35+
Args:
36+
path: The path to the local or s3 icechunk store
37+
"""
38+
39+
if path.startswith("s3://"):
40+
bucket, _, path = path.removeprefix("s3://").partition("/")
41+
store = icechunk.s3_storage(
42+
bucket=bucket,
43+
prefix=path,
44+
access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
45+
secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
46+
region=os.environ["AWS_REGION"],
47+
)
48+
else:
49+
store = icechunk.local_filesystem_storage(path=path)
50+
51+
repo = icechunk.Repository.open(store)
52+
session = repo.readonly_session("main")
53+
return xr.open_zarr(session.store)
54+
55+
56+
def app(date: pd.Timestamp | None = None) -> None:
57+
"""Runs metric calculations on cloudcasting for a given input day and appends to zarr store
58+
59+
Args:
60+
date: The day for which the cloudcasting predictions will be scored.
61+
"""
62+
63+
# Unpack environmental variables
64+
sat_path = os.environ["SATELLITE_ICECHUNK_ARCHIVE"]
65+
prediction_dir = os.environ["CLOUDCASTING_PREDICTION_DIRECTORY"]
66+
metric_zarr_path = os.environ["METRIC_ZARR_PATH"]
67+
68+
69+
now = pd.Timestamp.now(tz="UTC").replace(tzinfo=None)
70+
71+
# Default to yesterday
72+
if date is None:
73+
date = now.floor("1D") - pd.Timedelta("1D")
74+
75+
start_dt = date.floor("1D")
76+
end_dt = date.floor("1D") + pd.Timedelta("1D")
77+
78+
if now <= end_dt + FORECAST_STEPS.max():
79+
raise Exception(
80+
f"We cannot score forecast with init-time {end_dt} until after the last valid-time."
81+
)
82+
83+
# Open the satellite data store
84+
ds_sat = open_icechunk(path=sat_path)
85+
86+
# Slice to only the timesteps we need for scoring
87+
ds_sat = ds_sat.sel(time=slice(start_dt, end_dt + FORECAST_STEPS.max()))
88+
89+
# It is better to preload if we have the RAM space
90+
# - This eliminates any costs of repeatedly streaming data from the bucket
91+
# - It's also faster
92+
ds_sat = ds_sat.compute()
93+
94+
# Find recent forecasts
95+
date_string = start_dt.strftime("%Y-%m-%d")
96+
remote_path = f"{prediction_dir}/{date_string}*.zarr"
97+
fs, path = fsspec.core.url_to_fs(remote_path)
98+
99+
file_list = fs.glob(path)
100+
101+
# Filter forecasts
102+
# - We only score forecasts we have the satellite data for
103+
# - If we are missing one satellite image we will skip scoring all forecasts require that
104+
forecasts_to_score = []
105+
106+
for file in file_list:
107+
# Find the datetime of this forecast
108+
match = re.search(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}', file)
109+
assert match
110+
111+
# Check the satellite data required to score it is present
112+
init_time = pd.Timestamp(match.group(0))
113+
if np.isin(init_time + FORECAST_STEPS, ds_sat.time).all():
114+
forecasts_to_score.append(file)
115+
else:
116+
logger.warn(f"Cannot score {file} due to missing satellite data")
117+
118+
ds_mae_list = []
119+
120+
for file in tqdm(forecasts_to_score):
121+
ds_forecast = xr.open_zarr(fs.get_mapper(file)).compute()
122+
123+
valid_times = pd.Timestamp(ds_forecast.init_time.item()) + ds_forecast.step
124+
125+
ds_forecast = (
126+
ds_forecast
127+
.assign_coords(time=valid_times)
128+
.swap_dims({"step":"time"})
129+
)
130+
131+
ds_sat_sel = ds_sat.sel(
132+
time=ds_forecast.time,
133+
x_geostationary=ds_forecast.x_geostationary,
134+
y_geostationary=ds_forecast.y_geostationary,
135+
variable=ds_forecast.variable,
136+
)
137+
138+
da_mae = np.abs(
139+
(ds_sat_sel.data - ds_forecast.sat_pred)
140+
.swap_dims({"time":"step"})
141+
.drop_vars("time")
142+
)
143+
144+
# Create reductions of the full MAE matrix
145+
da_mae_step = da_mae.mean(dim=("x_geostationary", "y_geostationary", "variable"))
146+
da_mae_variable = da_mae.mean(dim=("x_geostationary", "y_geostationary", "step"))
147+
da_mae_spatial = da_mae.mean(dim=("step", "variable"))
148+
149+
ds_mae_reductions = xr.Dataset(
150+
{
151+
"mae_step": da_mae_step,
152+
"mae_variable": da_mae_variable,
153+
"mae_spatial": da_mae_spatial,
154+
}
155+
)
156+
157+
ds_mae_list.append(ds_mae_reductions)
158+
159+
# Concat all the MAE scores and in-fill missing init times with NaNs
160+
# - Filling with NaNs makes the chunking easier
161+
ds_all_maes = xr.concat(ds_mae_list, dim="init_time")
162+
expected_init_times = pd.date_range(start_dt, end_dt, freq=FORECAST_FREQ, inclusive="left")
163+
ds_all_maes = ds_all_maes.reindex(init_time=expected_init_times, method=None)
164+
165+
# Chunk the data ready for saving
166+
ds_all_maes = ds_all_maes.chunk(
167+
{
168+
"x_geostationary": -1,
169+
"y_geostationary": -1,
170+
"step": -1,
171+
"variable": -1,
172+
"init_time": 48
173+
}
174+
)
175+
176+
# If it exists, open the archive of MAE values and check the coordinates against them
177+
fs, stripped = fsspec.core.url_to_fs(metric_zarr_path)
178+
if fs.exists(stripped):
179+
ds_maes_archive = xr.open_zarr(metric_zarr_path)
180+
181+
if np.isin(ds_all_maes.init_time, ds_maes_archive.init_time).any():
182+
raise Exception("init-times in new MAEs already exist in MAE store")
183+
184+
for coord in ["variable", "step", "x_geostationary", "y_geostationary"]:
185+
if not ds_maes_archive[coord].identical(ds_all_maes[coord]):
186+
raise Exception("Found differences in coord: {coord}")
187+
188+
ds_all_maes.to_zarr(metric_zarr_path, mode="a-", append_dim="init_time")
189+
190+
else:
191+
ds_all_maes.to_zarr(metric_zarr_path, mode="w")
192+
193+
194+
195+
if __name__ == "__main__":
196+
app()

tests/conftest.py

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,31 @@
1-
import os
2-
3-
import fsspec
1+
from pathlib import Path
42
import numpy as np
53
import pandas as pd
64
import pytest
75
import xarray as xr
86
import zarr
7+
from cloudcasting_metrics.app import FORECAST_STEPS, FORECAST_FREQ
8+
import icechunk
9+
from icechunk.xarray import to_icechunk
910

1011
xr.set_options(keep_attrs=True)
1112

1213
@pytest.fixture()
13-
def test_t0():
14+
def init_time():
1415
return pd.Timestamp.now(tz="UTC").replace(tzinfo=None).floor("30min")
1516

1617

17-
def make_sat_data(test_t0, freq_mins):
18+
def make_sat_data(times: pd.DatetimeIndex):
1819

1920
# Load dataset which only contains coordinates, but no data
20-
shell_path = f"{os.path.dirname(os.path.abspath(__file__))}/test_data/non_hrv_shell.zarr.zip"
21+
shell_path = f"{Path(__file__).parent}/test_data/non_hrv_shell.zarr.zip"
2122
with zarr.storage.ZipStore(shell_path, mode="r") as store:
2223
ds = xr.open_zarr(store)
2324

2425
# Remove original time dim
2526
ds = ds.drop_vars("time")
2627

2728
# Add new times so they lead up to present
28-
times = pd.date_range(
29-
test_t0 - pd.Timedelta("3h"),
30-
test_t0,
31-
freq=f"{freq_mins}min",
32-
)
3329
ds = ds.expand_dims(time=times)
3430

3531
# Add data to dataset
@@ -51,5 +47,61 @@ def make_sat_data(test_t0, freq_mins):
5147

5248

5349
@pytest.fixture()
54-
def sat_5_data(test_t0):
55-
return make_sat_data(test_t0, freq_mins=5)
50+
def sat_5_data(init_time):
51+
times = pd.date_range(
52+
init_time - pd.Timedelta("3h"),
53+
init_time,
54+
freq=f"5min",
55+
)
56+
return make_sat_data(times)
57+
58+
59+
@pytest.fixture()
60+
def today():
61+
return pd.Timestamp.now(tz="UTC").replace(tzinfo=None).floor("1D")
62+
63+
64+
@pytest.fixture()
65+
def init_times_tuple(today) -> tuple[pd.DatetimeIndex, pd.DatetimeIndex]:
66+
init_times_day1 = pd.date_range(today - pd.Timedelta("2D"), freq=FORECAST_FREQ, periods=3)
67+
init_times_day2 = pd.date_range(today - pd.Timedelta("1D"), freq=FORECAST_FREQ, periods=3)
68+
return init_times_day1, init_times_day2
69+
70+
71+
@pytest.fixture()
72+
def forecast_directory(tmp_path_factory, init_times_tuple) -> str:
73+
pred_dir = str(tmp_path_factory.mktemp("pred_dir"))
74+
75+
all_init_times = [t for ts in init_times_tuple for t in ts]
76+
77+
for init_time in all_init_times:
78+
ds_pred = make_sat_data(times=init_time+FORECAST_STEPS)
79+
ds_pred = ds_pred.assign_coords(step=("time", FORECAST_STEPS))
80+
ds_pred = ds_pred.swap_dims({"time":"step"}).drop_vars("time")
81+
ds_pred = ds_pred.expand_dims({"init_time": [init_time]})
82+
ds_pred = ds_pred.rename({"data": "sat_pred"})
83+
84+
zarr_path = init_time.strftime(f"{pred_dir}/%Y-%m-%dT%H:%M.zarr")
85+
ds_pred.to_zarr(zarr_path)
86+
87+
yield pred_dir
88+
89+
@pytest.fixture()
90+
def sat_icechunk_path(tmp_path, init_times_tuple) -> str:
91+
sat_icechunk_path = str(tmp_path / "sat.icechunk")
92+
93+
all_init_times = [t for ts in init_times_tuple for t in ts]
94+
95+
sat_times = set([t for init_time in all_init_times for t in (init_time+FORECAST_STEPS)])
96+
sat_times = pd.to_datetime(sorted(sat_times))
97+
98+
ds_sat = make_sat_data(sat_times)
99+
100+
store = icechunk.local_filesystem_storage(sat_icechunk_path)
101+
repo = icechunk.Repository.create(store)
102+
session = repo.writable_session(branch="main")
103+
104+
to_icechunk(ds_sat, session)
105+
session.commit("Commit test data")
106+
107+
yield sat_icechunk_path

tests/test_cloudcasting_app/__init__.py

Whitespace-only changes.
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from cloudcasting_app.app import app
88

99

10-
def test_app(sat_5_data, tmp_path, test_t0):
10+
def test_app(sat_5_data, tmp_path, init_time):
1111

1212
os.chdir(tmp_path)
1313

@@ -23,7 +23,7 @@ def test_app(sat_5_data, tmp_path, test_t0):
2323

2424
# Check the two output files have been created
2525
latest_zarr_path = f"{tmp_path}/latest.zarr"
26-
t0_string_zarr_path = test_t0.strftime(f"{tmp_path}/%Y-%m-%dT%H:%M.zarr")
26+
t0_string_zarr_path = init_time.strftime(f"{tmp_path}/%Y-%m-%dT%H:%M.zarr")
2727
assert os.path.exists(latest_zarr_path)
2828
assert os.path.exists(t0_string_zarr_path)
2929

@@ -37,7 +37,7 @@ def test_app(sat_5_data, tmp_path, test_t0):
3737
)
3838

3939
# Make sure all the coords are correct
40-
assert ds_y_hat.init_time == test_t0
40+
assert ds_y_hat.init_time == init_time
4141
assert len(ds_y_hat.step)==12
4242
assert (ds_y_hat.x_geostationary==sat_5_data.x_geostationary).all()
4343
assert (ds_y_hat.y_geostationary==sat_5_data.y_geostationary).all()

tests/test_cloudcasting_metrics/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)