diff --git a/doc/api.rst b/doc/api.rst index 6011aa81..6035dfe8 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -18,6 +18,8 @@ Creating a model model.Model.add_variables model.Model.add_constraints model.Model.add_objective + model.Model.add_piecewise_constraints + model.Model.add_disjunctive_piecewise_constraints model.Model.linexpr model.Model.remove_constraints diff --git a/doc/index.rst b/doc/index.rst index bff9fa65..6980c36c 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -112,6 +112,8 @@ This package is published under MIT license. creating-expressions creating-constraints sos-constraints + piecewise-linear-constraints + piecewise-linear-constraints-tutorial manipulating-models testing-framework transport-tutorial diff --git a/doc/piecewise-linear-constraints-tutorial.nblink b/doc/piecewise-linear-constraints-tutorial.nblink new file mode 100644 index 00000000..ea48e11f --- /dev/null +++ b/doc/piecewise-linear-constraints-tutorial.nblink @@ -0,0 +1,3 @@ +{ + "path": "../examples/piecewise-linear-constraints.ipynb" +} diff --git a/doc/piecewise-linear-constraints.rst b/doc/piecewise-linear-constraints.rst new file mode 100644 index 00000000..9e9ae6a4 --- /dev/null +++ b/doc/piecewise-linear-constraints.rst @@ -0,0 +1,317 @@ +.. _piecewise-linear-constraints: + +Piecewise Linear Constraints +============================ + +Piecewise linear (PWL) constraints approximate nonlinear functions as connected +linear segments, allowing you to model cost curves, efficiency curves, or +production functions within a linear programming framework. + +Linopy provides two methods: + +- :py:meth:`~linopy.model.Model.add_piecewise_constraints` -- for + **continuous** piecewise linear functions (segments connected end-to-end). +- :py:meth:`~linopy.model.Model.add_disjunctive_piecewise_constraints` -- for + **disconnected** segments (with gaps between them). + +.. contents:: + :local: + :depth: 2 + +Formulations +------------ + +SOS2 (Convex Combination) +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Given breakpoints :math:`b_0, b_1, \ldots, b_n`, the SOS2 formulation +introduces interpolation variables :math:`\lambda_i` such that: + +.. math:: + + \lambda_i \in [0, 1], \quad + \sum_{i=0}^{n} \lambda_i = 1, \quad + x = \sum_{i=0}^{n} \lambda_i \, b_i + +The SOS2 constraint ensures that **at most two adjacent** :math:`\lambda_i` can +be non-zero, so :math:`x` is interpolated within one segment. + +**Dict (multi-variable) case.** When multiple variables share the same lambdas, +breakpoints carry an extra *link* dimension :math:`v \in V` and linking becomes +:math:`x_v = \sum_i \lambda_i \, b_{v,i}` for all :math:`v`. + +.. note:: + + SOS2 is a combinatorial constraint handled via branch-and-bound, similar to + integer variables. It cannot be reformulated as a pure LP. Prefer the + incremental method (``method="incremental"`` or ``method="auto"``) when + breakpoints are monotonic. + +Incremental (Delta) Formulation +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For **strictly monotonic** breakpoints :math:`b_0 < b_1 < \cdots < b_n`, the +incremental formulation is a **pure LP** (no SOS2 or binary variables): + +.. math:: + + \delta_i \in [0, 1], \quad + \delta_{i+1} \le \delta_i, \quad + x = b_0 + \sum_{i=1}^{n} \delta_i \, (b_i - b_{i-1}) + +The filling-order constraints enforce that segment :math:`i+1` cannot be +partially filled unless segment :math:`i` is completely filled. + +**Limitation:** Breakpoints must be strictly monotonic for every linked +variable. In the dict case, each variable is checked independently -- e.g. +power increasing while fuel decreases is fine, but a curve that rises then +falls is not. For non-monotonic curves, use SOS2. + +Disjunctive (Disaggregated Convex Combination) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For **disconnected segments** (with gaps), the disjunctive formulation selects +exactly one segment via binary indicators and applies SOS2 within it. No big-M +constants are needed, giving a tight LP relaxation. + +Given :math:`K` segments, each with breakpoints :math:`b_{k,0}, \ldots, b_{k,n_k}`: + +.. math:: + + y_k \in \{0, 1\}, \quad \sum_{k} y_k = 1 + + \lambda_{k,i} \in [0, 1], \quad + \sum_{i} \lambda_{k,i} = y_k, \quad + x = \sum_{k} \sum_{i} \lambda_{k,i} \, b_{k,i} + +.. _choosing-a-formulation: + +Choosing a Formulation +~~~~~~~~~~~~~~~~~~~~~~ + +The incremental method is the fastest to solve (pure LP), but requires strictly +monotonic breakpoints. Pass ``method="auto"`` to use it automatically when +applicable, falling back to SOS2 otherwise. + +.. list-table:: + :header-rows: 1 + :widths: 25 25 25 25 + + * - Property + - SOS2 + - Incremental + - Disjunctive + * - Segments + - Connected + - Connected + - Disconnected (gaps allowed) + * - Breakpoint order + - Any + - Strictly monotonic + - Any (per segment) + * - Variable types + - Continuous + SOS2 + - Continuous only (pure LP) + - Binary + SOS2 + * - Solver support + - Solvers with SOS2 support + - **Any LP solver** + - Solvers with SOS2 + MIP support + +Basic Usage +----------- + +Single variable +~~~~~~~~~~~~~~~ + +.. code-block:: python + + import linopy + import xarray as xr + + m = linopy.Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray([0, 10, 50, 100], dims=["bp"]) + m.add_piecewise_constraints(x, breakpoints, dim="bp") + +Dict of variables +~~~~~~~~~~~~~~~~~~ + +Link multiple variables through shared interpolation weights. For example, a +turbine where power input determines power output (via a nonlinear efficiency +factor): + +.. code-block:: python + + m = linopy.Model() + + power_in = m.add_variables(name="power_in") + power_out = m.add_variables(name="power_out") + + # At 50 MW input the turbine produces 47.5 MW output (95% eff), + # at 100 MW input only 90 MW output (90% eff) + breakpoints = xr.DataArray( + [[0, 50, 100], [0, 47.5, 90]], + coords={"var": ["power_in", "power_out"], "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints( + {"power_in": power_in, "power_out": power_out}, + breakpoints, + link_dim="var", + dim="bp", + ) + +Incremental method +~~~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + m.add_piecewise_constraints(x, breakpoints, dim="bp", method="incremental") + +Pass ``method="auto"`` to automatically select incremental when breakpoints are +strictly monotonic, falling back to SOS2 otherwise. + +Disjunctive (disconnected segments) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + m = linopy.Model() + x = m.add_variables(name="x") + + # Two disconnected segments: [0, 10] and [50, 100] + breakpoints = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + +Method Signatures +----------------- + +``add_piecewise_constraints`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + Model.add_piecewise_constraints( + expr, + breakpoints, + link_dim=None, + dim="breakpoint", + mask=None, + name=None, + skip_nan_check=False, + method="sos2", + ) + +- ``expr`` -- ``Variable``, ``LinearExpression``, or ``dict`` of these. +- ``breakpoints`` -- ``xr.DataArray`` with breakpoint values. Must have ``dim`` + as a dimension. For the dict case, must also have ``link_dim``. +- ``link_dim`` -- ``str``, optional. Dimension linking to different expressions. +- ``dim`` -- ``str``, default ``"breakpoint"``. Breakpoint-index dimension. +- ``mask`` -- ``xr.DataArray``, optional. Boolean mask for valid constraints. +- ``name`` -- ``str``, optional. Base name for generated variables/constraints. +- ``skip_nan_check`` -- ``bool``, default ``False``. +- ``method`` -- ``"sos2"`` (default), ``"incremental"``, or ``"auto"``. + +``add_disjunctive_piecewise_constraints`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + Model.add_disjunctive_piecewise_constraints( + expr, + breakpoints, + link_dim=None, + dim="breakpoint", + segment_dim="segment", + mask=None, + name=None, + skip_nan_check=False, + ) + +Same as above, plus: + +- ``segment_dim`` -- ``str``, default ``"segment"``. Dimension indexing + segments. Use NaN in breakpoints to pad segments with fewer breakpoints. + +Generated Variables and Constraints +------------------------------------ + +Given base name ``name``, the following objects are created: + +**SOS2 method:** + +.. list-table:: + :header-rows: 1 + :widths: 30 15 55 + + * - Name + - Type + - Description + * - ``{name}_lambda`` + - Variable + - Interpolation weights :math:`\lambda_i \in [0, 1]` (SOS2). + * - ``{name}_convex`` + - Constraint + - :math:`\sum_i \lambda_i = 1`. + * - ``{name}_link`` + - Constraint + - :math:`x = \sum_i \lambda_i \, b_i`. + +**Incremental method:** + +.. list-table:: + :header-rows: 1 + :widths: 30 15 55 + + * - Name + - Type + - Description + * - ``{name}_delta`` + - Variable + - Fill-fraction variables :math:`\delta_i \in [0, 1]`. + * - ``{name}_fill`` + - Constraint + - :math:`\delta_{i+1} \le \delta_i` (only if 3+ breakpoints). + * - ``{name}_link`` + - Constraint + - :math:`x = b_0 + \sum_i \delta_i \, s_i`. + +**Disjunctive method:** + +.. list-table:: + :header-rows: 1 + :widths: 30 15 55 + + * - Name + - Type + - Description + * - ``{name}_binary`` + - Variable + - Segment indicators :math:`y_k \in \{0, 1\}`. + * - ``{name}_select`` + - Constraint + - :math:`\sum_k y_k = 1`. + * - ``{name}_lambda`` + - Variable + - Per-segment interpolation weights (SOS2). + * - ``{name}_convex`` + - Constraint + - :math:`\sum_i \lambda_{k,i} = y_k`. + * - ``{name}_link`` + - Constraint + - :math:`x = \sum_k \sum_i \lambda_{k,i} \, b_{k,i}`. + +See Also +-------- + +- :doc:`piecewise-linear-constraints-tutorial` -- Worked examples with all three formulations +- :doc:`sos-constraints` -- Low-level SOS1/SOS2 constraint API +- :doc:`creating-constraints` -- General constraint creation +- :doc:`user-guide` -- Overall linopy usage patterns diff --git a/doc/release_notes.rst b/doc/release_notes.rst index 2d7a9fcb..30e63737 100644 --- a/doc/release_notes.rst +++ b/doc/release_notes.rst @@ -4,6 +4,10 @@ Release Notes Upcoming Version ---------------- +* Add ``add_piecewise_constraints()`` for piecewise linear constraints with SOS2 and incremental (pure LP) formulations. +* Add ``add_disjunctive_piecewise_constraints()`` for disconnected piecewise linear segments (e.g. forbidden operating zones). +* Add the `sphinx-copybutton` to the documentation + Version 0.6.2 -------------- diff --git a/doc/sos-constraints.rst b/doc/sos-constraints.rst index 37dd72d2..5d31f2d6 100644 --- a/doc/sos-constraints.rst +++ b/doc/sos-constraints.rst @@ -260,6 +260,11 @@ Common Patterns Piecewise Linear Cost Function ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +.. note:: + + For a higher-level API that handles all the SOS2 bookkeeping automatically, + see :doc:`piecewise-linear-constraints`. + .. code-block:: python def add_piecewise_cost(model, variable, breakpoints, costs): diff --git a/examples/piecewise-linear-constraints.ipynb b/examples/piecewise-linear-constraints.ipynb new file mode 100644 index 00000000..afaae811 --- /dev/null +++ b/examples/piecewise-linear-constraints.ipynb @@ -0,0 +1,552 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "intro", + "metadata": {}, + "source": [ + "# Piecewise Linear Constraints\n", + "\n", + "This notebook demonstrates linopy's three PWL formulations. Each example\n", + "builds a separate dispatch model where a single power plant must meet\n", + "a time-varying demand.\n", + "\n", + "| Example | Plant | Limitation | Formulation |\n", + "|---------|-------|------------|-------------|\n", + "| 1 | Gas turbine (0–100 MW) | Convex heat rate | SOS2 |\n", + "| 2 | Coal plant (0–150 MW) | Monotonic heat rate | Incremental |\n", + "| 3 | Diesel generator (off or 50–80 MW) | Forbidden zone | Disjunctive |" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "imports", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:33.511970Z", + "start_time": "2026-02-09T19:21:33.501473Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:41.350637Z", + "iopub.status.busy": "2026-02-09T19:21:41.350440Z", + "iopub.status.idle": "2026-02-09T19:21:42.583457Z", + "shell.execute_reply": "2026-02-09T19:21:42.583146Z" + } + }, + "outputs": [], + "source": [ + "import matplotlib.pyplot as plt\n", + "import pandas as pd\n", + "import xarray as xr\n", + "\n", + "import linopy\n", + "\n", + "time = pd.Index([1, 2, 3], name=\"time\")\n", + "\n", + "\n", + "def plot_pwl_results(model, breakpoints, demand, color=\"C0\", fuel_rate=None):\n", + " \"\"\"Plot PWL curve with operating points and dispatch vs demand.\"\"\"\n", + " sol = model.solution\n", + " bp = breakpoints.to_pandas()\n", + " fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 3.5))\n", + "\n", + " # Left: PWL curve with operating points\n", + " if \"var\" in breakpoints.dims:\n", + " # Connected: power-fuel curve from var dimension\n", + " ax1.plot(\n", + " bp.loc[\"power\"], bp.loc[\"fuel\"], \"o-\", color=color, label=\"Breakpoints\"\n", + " )\n", + " for t in time:\n", + " ax1.plot(\n", + " sol[\"power\"].sel(time=t),\n", + " sol[\"fuel\"].sel(time=t),\n", + " \"s\",\n", + " ms=10,\n", + " label=f\"t={t}\",\n", + " )\n", + " ax1.set(xlabel=\"Power (MW)\", ylabel=\"Fuel (MWh)\", title=\"Heat rate curve\")\n", + " else:\n", + " # Disconnected: segments with linear cost\n", + " for seg in bp.index:\n", + " lo, hi = bp.loc[seg]\n", + " pw = [lo, hi] if lo != hi else [lo]\n", + " ax1.plot(\n", + " pw,\n", + " [fuel_rate * p for p in pw],\n", + " \"o-\",\n", + " color=color,\n", + " label=\"Breakpoints\" if seg == 0 else None,\n", + " )\n", + " ax1.axvspan(\n", + " bp.iloc[0, 1] + 0.5,\n", + " bp.iloc[1, 0] - 0.5,\n", + " color=\"red\",\n", + " alpha=0.1,\n", + " label=\"Forbidden zone\",\n", + " )\n", + " for t in time:\n", + " p = float(sol[\"power\"].sel(time=t))\n", + " ax1.plot(p, fuel_rate * p, \"s\", ms=10, label=f\"t={t}\")\n", + " ax1.set(xlabel=\"Power (MW)\", ylabel=\"Cost\", title=\"Cost curve\")\n", + " ax1.legend()\n", + "\n", + " # Right: dispatch vs demand\n", + " x = list(range(len(time)))\n", + " power_vals = sol[\"power\"].values\n", + " ax2.bar(x, power_vals, color=color, label=\"Power\")\n", + " if \"backup\" in sol:\n", + " ax2.bar(\n", + " x,\n", + " sol[\"backup\"].values,\n", + " bottom=power_vals,\n", + " color=\"C3\",\n", + " alpha=0.5,\n", + " label=\"Backup\",\n", + " )\n", + " ax2.step(\n", + " [v - 0.5 for v in x] + [x[-1] + 0.5],\n", + " list(demand.values) + [demand.values[-1]],\n", + " where=\"post\",\n", + " color=\"black\",\n", + " lw=2,\n", + " label=\"Demand\",\n", + " )\n", + " ax2.set(\n", + " xlabel=\"Time\", ylabel=\"MW\", title=\"Dispatch\", xticks=x, xticklabels=time.values\n", + " )\n", + " ax2.legend()\n", + " plt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "id": "sos2-md", + "metadata": {}, + "source": [ + "## 1. SOS2 formulation — Gas turbine\n", + "\n", + "The gas turbine has a **convex** heat rate: efficient at moderate load,\n", + "increasingly fuel-hungry at high output. We use the **SOS2** formulation\n", + "to link power output and fuel consumption." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "sos2-setup", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:33.525641Z", + "start_time": "2026-02-09T19:21:33.516874Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:42.585470Z", + "iopub.status.busy": "2026-02-09T19:21:42.585263Z", + "iopub.status.idle": "2026-02-09T19:21:42.639106Z", + "shell.execute_reply": "2026-02-09T19:21:42.638745Z" + } + }, + "outputs": [], + "source": [ + "breakpoints = xr.DataArray(\n", + " [[0, 30, 60, 100], [0, 36, 84, 170]],\n", + " coords={\"var\": [\"power\", \"fuel\"], \"bp\": [0, 1, 2, 3]},\n", + ")\n", + "breakpoints.to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df198d44e962132f", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:33.584017Z", + "start_time": "2026-02-09T19:21:33.548479Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:42.640305Z", + "iopub.status.busy": "2026-02-09T19:21:42.640145Z", + "iopub.status.idle": "2026-02-09T19:21:42.676689Z", + "shell.execute_reply": "2026-02-09T19:21:42.676404Z" + } + }, + "outputs": [], + "source": [ + "m1 = linopy.Model()\n", + "\n", + "power = m1.add_variables(name=\"power\", lower=0, upper=100, coords=[time])\n", + "fuel = m1.add_variables(name=\"fuel\", lower=0, coords=[time])\n", + "\n", + "m1.add_piecewise_constraints(\n", + " {\"power\": power, \"fuel\": fuel},\n", + " breakpoints.expand_dims(time=time),\n", + " link_dim=\"var\",\n", + " dim=\"bp\",\n", + " name=\"pwl\",\n", + " method=\"sos2\",\n", + ")\n", + "\n", + "demand1 = xr.DataArray([50, 80, 30], coords=[time])\n", + "m1.add_constraints(power >= demand1, name=\"demand\")\n", + "m1.add_objective(fuel.sum())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "sos2-solve", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:33.646228Z", + "start_time": "2026-02-09T19:21:33.602890Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:42.678723Z", + "iopub.status.busy": "2026-02-09T19:21:42.678455Z", + "iopub.status.idle": "2026-02-09T19:21:42.729810Z", + "shell.execute_reply": "2026-02-09T19:21:42.729268Z" + } + }, + "outputs": [], + "source": [ + "m1.solve()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "sos2-results", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:33.671517Z", + "start_time": "2026-02-09T19:21:33.665702Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:42.732333Z", + "iopub.status.busy": "2026-02-09T19:21:42.732173Z", + "iopub.status.idle": "2026-02-09T19:21:42.737877Z", + "shell.execute_reply": "2026-02-09T19:21:42.737648Z" + } + }, + "outputs": [], + "source": [ + "m1.solution[[\"power\", \"fuel\"]].to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "hcqytsfoaa", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:33.802613Z", + "start_time": "2026-02-09T19:21:33.695925Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:42.739144Z", + "iopub.status.busy": "2026-02-09T19:21:42.738977Z", + "iopub.status.idle": "2026-02-09T19:21:42.983660Z", + "shell.execute_reply": "2026-02-09T19:21:42.982758Z" + } + }, + "outputs": [], + "source": [ + "plot_pwl_results(m1, breakpoints, demand1, color=\"C0\")" + ] + }, + { + "cell_type": "markdown", + "id": "incremental-md", + "metadata": {}, + "source": [ + "## 2. Incremental formulation — Coal plant\n", + "\n", + "The coal plant has a **monotonically increasing** heat rate. Since all\n", + "breakpoints are strictly monotonic, we can use the **incremental**\n", + "formulation — a pure LP with no SOS2 or binary variables." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "incremental-setup", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:33.829667Z", + "start_time": "2026-02-09T19:21:33.825683Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:42.987305Z", + "iopub.status.busy": "2026-02-09T19:21:42.986204Z", + "iopub.status.idle": "2026-02-09T19:21:43.003874Z", + "shell.execute_reply": "2026-02-09T19:21:42.998265Z" + } + }, + "outputs": [], + "source": [ + "breakpoints = xr.DataArray(\n", + " [[0, 50, 100, 150], [0, 55, 130, 225]],\n", + " coords={\"var\": [\"power\", \"fuel\"], \"bp\": [0, 1, 2, 3]},\n", + ")\n", + "breakpoints.to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8nq1zqvq9re", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:33.913679Z", + "start_time": "2026-02-09T19:21:33.855910Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:43.009748Z", + "iopub.status.busy": "2026-02-09T19:21:43.009216Z", + "iopub.status.idle": "2026-02-09T19:21:43.067070Z", + "shell.execute_reply": "2026-02-09T19:21:43.066402Z" + } + }, + "outputs": [], + "source": [ + "m2 = linopy.Model()\n", + "\n", + "power = m2.add_variables(name=\"power\", lower=0, upper=150, coords=[time])\n", + "fuel = m2.add_variables(name=\"fuel\", lower=0, coords=[time])\n", + "\n", + "m2.add_piecewise_constraints(\n", + " {\"power\": power, \"fuel\": fuel},\n", + " breakpoints.expand_dims(time=time),\n", + " link_dim=\"var\",\n", + " dim=\"bp\",\n", + " name=\"pwl\",\n", + " method=\"incremental\",\n", + ")\n", + "\n", + "demand2 = xr.DataArray([80, 120, 50], coords=[time])\n", + "m2.add_constraints(power >= demand2, name=\"demand\")\n", + "m2.add_objective(fuel.sum())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "incremental-solve", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:33.981694Z", + "start_time": "2026-02-09T19:21:33.933519Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:43.070384Z", + "iopub.status.busy": "2026-02-09T19:21:43.070023Z", + "iopub.status.idle": "2026-02-09T19:21:43.124118Z", + "shell.execute_reply": "2026-02-09T19:21:43.123883Z" + } + }, + "outputs": [], + "source": [ + "m2.solve();" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "incremental-results", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:33.991781Z", + "start_time": "2026-02-09T19:21:33.986137Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:43.125356Z", + "iopub.status.busy": "2026-02-09T19:21:43.125291Z", + "iopub.status.idle": "2026-02-09T19:21:43.129072Z", + "shell.execute_reply": "2026-02-09T19:21:43.128850Z" + } + }, + "outputs": [], + "source": [ + "m2.solution[[\"power\", \"fuel\"]].to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fua98r986pl", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:34.116658Z", + "start_time": "2026-02-09T19:21:34.021992Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:43.130293Z", + "iopub.status.busy": "2026-02-09T19:21:43.130221Z", + "iopub.status.idle": "2026-02-09T19:21:43.281657Z", + "shell.execute_reply": "2026-02-09T19:21:43.281256Z" + } + }, + "outputs": [], + "source": [ + "plot_pwl_results(m2, breakpoints, demand2, color=\"C1\")" + ] + }, + { + "cell_type": "markdown", + "id": "disjunctive-md", + "metadata": {}, + "source": [ + "## 3. Disjunctive formulation — Diesel generator\n", + "\n", + "The diesel generator has a **forbidden operating zone**: it must either\n", + "be off (0 MW) or run between 50–80 MW. Because of this gap, we add a\n", + "high-cost **backup** source to cover demand when the diesel is off or at\n", + "its maximum." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "disjunctive-setup", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:34.147920Z", + "start_time": "2026-02-09T19:21:34.142740Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:43.283679Z", + "iopub.status.busy": "2026-02-09T19:21:43.283490Z", + "iopub.status.idle": "2026-02-09T19:21:43.290429Z", + "shell.execute_reply": "2026-02-09T19:21:43.289665Z" + } + }, + "outputs": [], + "source": [ + "breakpoints = xr.DataArray(\n", + " [[0, 0], [50, 80]],\n", + " dims=[\"segment\", \"breakpoint\"],\n", + " coords={\"segment\": [0, 1], \"breakpoint\": [0, 1]},\n", + ")\n", + "breakpoints.to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "reevc7ood3", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:34.234326Z", + "start_time": "2026-02-09T19:21:34.188461Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:43.293229Z", + "iopub.status.busy": "2026-02-09T19:21:43.292936Z", + "iopub.status.idle": "2026-02-09T19:21:43.363049Z", + "shell.execute_reply": "2026-02-09T19:21:43.362442Z" + } + }, + "outputs": [], + "source": [ + "m3 = linopy.Model()\n", + "\n", + "power = m3.add_variables(name=\"power\", lower=0, upper=80, coords=[time])\n", + "backup = m3.add_variables(name=\"backup\", lower=0, coords=[time])\n", + "\n", + "m3.add_disjunctive_piecewise_constraints(\n", + " power, breakpoints.expand_dims(time=time), name=\"pwl\"\n", + ")\n", + "\n", + "demand3 = xr.DataArray([10, 70, 90], coords=[time])\n", + "m3.add_constraints(power + backup >= demand3, name=\"demand\")\n", + "m3.add_objective((2.5 * power + 10 * backup).sum())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "disjunctive-solve", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:34.322383Z", + "start_time": "2026-02-09T19:21:34.260066Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:43.366552Z", + "iopub.status.busy": "2026-02-09T19:21:43.366148Z", + "iopub.status.idle": "2026-02-09T19:21:43.457707Z", + "shell.execute_reply": "2026-02-09T19:21:43.457113Z" + } + }, + "outputs": [], + "source": [ + "m3.solve()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "disjunctive-results", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:34.333489Z", + "start_time": "2026-02-09T19:21:34.327107Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:43.459934Z", + "iopub.status.busy": "2026-02-09T19:21:43.459654Z", + "iopub.status.idle": "2026-02-09T19:21:43.468110Z", + "shell.execute_reply": "2026-02-09T19:21:43.465566Z" + } + }, + "outputs": [], + "source": [ + "m3.solution[[\"power\", \"backup\"]].to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "g32vxea6jwe", + "metadata": { + "ExecuteTime": { + "end_time": "2026-02-09T19:21:34.545650Z", + "start_time": "2026-02-09T19:21:34.425456Z" + }, + "execution": { + "iopub.execute_input": "2026-02-09T19:21:43.475302Z", + "iopub.status.busy": "2026-02-09T19:21:43.475060Z", + "iopub.status.idle": "2026-02-09T19:21:43.697893Z", + "shell.execute_reply": "2026-02-09T19:21:43.697398Z" + } + }, + "outputs": [], + "source": [ + "plot_pwl_results(m3, breakpoints, demand3, color=\"C2\", fuel_rate=2.5)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/linopy/constants.py b/linopy/constants.py index 021a9a10..cb1eb181 100644 --- a/linopy/constants.py +++ b/linopy/constants.py @@ -35,6 +35,17 @@ TERM_DIM = "_term" STACKED_TERM_DIM = "_stacked_term" + +# Piecewise linear constraint constants +PWL_LAMBDA_SUFFIX = "_lambda" +PWL_CONVEX_SUFFIX = "_convex" +PWL_LINK_SUFFIX = "_link" +PWL_DELTA_SUFFIX = "_delta" +PWL_FILL_SUFFIX = "_fill" +PWL_BINARY_SUFFIX = "_binary" +PWL_SELECT_SUFFIX = "_select" +DEFAULT_BREAKPOINT_DIM = "breakpoint" +DEFAULT_SEGMENT_DIM = "segment" GROUPED_TERM_DIM = "_grouped_term" GROUP_DIM = "_group" FACTOR_DIM = "_factor" diff --git a/linopy/model.py b/linopy/model.py index a2fa8e4e..5bbfa303 100644 --- a/linopy/model.py +++ b/linopy/model.py @@ -35,9 +35,18 @@ to_path, ) from linopy.constants import ( + DEFAULT_BREAKPOINT_DIM, + DEFAULT_SEGMENT_DIM, GREATER_EQUAL, HELPER_DIMS, LESS_EQUAL, + PWL_BINARY_SUFFIX, + PWL_CONVEX_SUFFIX, + PWL_DELTA_SUFFIX, + PWL_FILL_SUFFIX, + PWL_LAMBDA_SUFFIX, + PWL_LINK_SUFFIX, + PWL_SELECT_SUFFIX, TERM_DIM, ModelStatus, TerminationCondition, @@ -130,6 +139,7 @@ class Model: "_cCounter", "_varnameCounter", "_connameCounter", + "_pwlCounter", "_blocks", # TODO: check if these should not be mutable "_chunk", @@ -186,6 +196,7 @@ def __init__( self._cCounter: int = 0 self._varnameCounter: int = 0 self._connameCounter: int = 0 + self._pwlCounter: int = 0 self._blocks: DataArray | None = None self._chunk: T_Chunks = chunk @@ -625,6 +636,657 @@ def add_sos_constraints( variable.attrs.update(sos_type=sos_type, sos_dim=sos_dim) + def add_piecewise_constraints( + self, + expr: Variable | LinearExpression | dict[str, Variable | LinearExpression], + breakpoints: DataArray, + link_dim: str | None = None, + dim: str = DEFAULT_BREAKPOINT_DIM, + mask: DataArray | None = None, + name: str | None = None, + skip_nan_check: bool = False, + method: Literal["sos2", "incremental", "auto"] = "sos2", + ) -> Constraint: + """ + Add a piecewise linear constraint using SOS2 or incremental formulation. + + This method creates a piecewise linear constraint that links one or more + variables/expressions together via a set of breakpoints. It supports two + formulations: + + - **SOS2** (default): Uses SOS2 (Special Ordered Set of type 2) with lambda + (interpolation) variables. Works for any breakpoints. + - **Incremental**: Uses delta variables with filling-order constraints. + Pure LP formulation (no SOS2 or binary variables), but requires strictly + monotonic breakpoints. + + Parameters + ---------- + expr : Variable, LinearExpression, or dict of these + The variable(s) or expression(s) to be linked by the piecewise constraint. + - If a single Variable/LinearExpression is passed, the breakpoints + directly specify the piecewise points for that expression. + - If a dict is passed, the keys must match coordinates in `link_dim` + of the breakpoints, allowing multiple expressions to be linked. + breakpoints : xr.DataArray + The breakpoint values defining the piecewise linear function. + Must have `dim` as one of its dimensions. If `expr` is a dict, + must also have `link_dim` dimension with coordinates matching the + dict keys. + link_dim : str, optional + The dimension in breakpoints that links to different expressions. + Required when `expr` is a dict. If None and `expr` is a dict, + will attempt to auto-detect from breakpoints dimensions. + dim : str, default "breakpoint" + The dimension in breakpoints that represents the breakpoint index. + This dimension's coordinates must be numeric (used as SOS2 weights + for the SOS2 method). + mask : xr.DataArray, optional + Boolean mask indicating which piecewise constraints are valid. + If None, auto-detected from NaN values in breakpoints (unless + skip_nan_check is True). + name : str, optional + Base name for the generated variables and constraints. + If None, auto-generates names like "pwl0", "pwl1", etc. + skip_nan_check : bool, default False + If True, skip automatic NaN detection in breakpoints. Use this + when you know breakpoints contain no NaN values for better performance. + method : str, default "sos2" + Formulation method. One of: + - ``"sos2"``: SOS2 formulation with lambda variables (default). + - ``"incremental"``: Incremental (delta) formulation. Requires strictly + monotonic breakpoints. Pure LP, no SOS2 or binary variables. + - ``"auto"``: Automatically selects ``"incremental"`` if breakpoints are + strictly monotonic, otherwise falls back to ``"sos2"``. + + Returns + ------- + Constraint + For SOS2: the convexity constraint (sum of lambda = 1). + For incremental: the first filling-order constraint (or the link + constraint if only 2 breakpoints). + + Raises + ------ + ValueError + If expr is not a Variable, LinearExpression, or dict of these. + If breakpoints doesn't have the required dim dimension. + If link_dim cannot be auto-detected when expr is a dict. + If link_dim coordinates don't match dict keys. + If dim coordinates are not numeric (SOS2 method only). + If breakpoints are not strictly monotonic (incremental method). + If method is not one of 'sos2', 'incremental', 'auto'. + + Examples + -------- + Single variable piecewise constraint: + + >>> m = Model() + >>> x = m.add_variables(name="x") + >>> breakpoints = xr.DataArray([0, 10, 50, 100], dims=["bp"]) + >>> _ = m.add_piecewise_constraints(x, breakpoints, dim="bp") + + Using an expression: + + >>> m = Model() + >>> x = m.add_variables(name="x") + >>> y = m.add_variables(name="y") + >>> breakpoints = xr.DataArray([0, 10, 50, 100], dims=["bp"]) + >>> _ = m.add_piecewise_constraints(x + y, breakpoints, dim="bp") + + Multiple linked variables (e.g., power-efficiency curve): + + >>> m = Model() + >>> generators = ["gen1", "gen2"] + >>> power = m.add_variables(coords=[generators], name="power") + >>> efficiency = m.add_variables(coords=[generators], name="efficiency") + >>> breakpoints = xr.DataArray( + ... [[0, 50, 100], [0.8, 0.95, 0.9]], + ... coords={"var": ["power", "efficiency"], "bp": [0, 1, 2]}, + ... ) + >>> _ = m.add_piecewise_constraints( + ... {"power": power, "efficiency": efficiency}, + ... breakpoints, + ... link_dim="var", + ... dim="bp", + ... ) + + Incremental formulation (no SOS2, pure LP): + + >>> m = Model() + >>> x = m.add_variables(name="x") + >>> breakpoints = xr.DataArray([0, 10, 50, 100], dims=["bp"]) + >>> _ = m.add_piecewise_constraints( + ... x, breakpoints, dim="bp", method="incremental" + ... ) + + Notes + ----- + **SOS2 formulation:** + + 1. Lambda variables λ_i with bounds [0, 1] are created for each breakpoint + 2. SOS2 constraint ensures at most two adjacent λ_i can be non-zero + 3. Convexity constraint: Σ λ_i = 1 + 4. Linking constraints: expr = Σ λ_i × breakpoint_i (for each expression) + + **Incremental formulation** (for strictly monotonic breakpoints bp₀ < bp₁ < ... < bpₙ): + + 1. Delta variables δᵢ ∈ [0, 1] for i = 1, ..., n (one per segment) + 2. Filling-order constraints: δᵢ₊₁ ≤ δᵢ for i = 1, ..., n-1 + 3. Linking constraint: expr = bp₀ + Σᵢ δᵢ × (bpᵢ - bpᵢ₋₁) + """ + if method not in ("sos2", "incremental", "auto"): + raise ValueError( + f"method must be 'sos2', 'incremental', or 'auto', got '{method}'" + ) + + # --- Input validation --- + if dim not in breakpoints.dims: + raise ValueError( + f"breakpoints must have dimension '{dim}', " + f"but only has dimensions {list(breakpoints.dims)}" + ) + + # Resolve method for 'auto' + if method == "auto": + if self._check_strict_monotonicity(breakpoints, dim): + method = "incremental" + else: + method = "sos2" + + # Numeric coordinates only required for SOS2 + if method == "sos2" and not pd.api.types.is_numeric_dtype( + breakpoints.coords[dim] + ): + raise ValueError( + f"Breakpoint dimension '{dim}' must have numeric coordinates " + f"for SOS2 weights, but got {breakpoints.coords[dim].dtype}" + ) + + # --- Generate names using counter --- + if name is None: + name = f"pwl{self._pwlCounter}" + self._pwlCounter += 1 + + # --- Determine target expression and related info --- + is_single = isinstance(expr, Variable | LinearExpression) + is_dict = isinstance(expr, dict) + + if not is_single and not is_dict: + raise ValueError( + f"'expr' must be a Variable, LinearExpression, or dict of these, " + f"got {type(expr)}" + ) + + if is_single: + assert isinstance(expr, Variable | LinearExpression) + target_expr = self._to_linexpr(expr) + resolved_link_dim = None + computed_mask = self._compute_pwl_mask(mask, breakpoints, skip_nan_check) + lambda_mask = computed_mask + else: + assert isinstance(expr, dict) + expr_dict: dict[str, Variable | LinearExpression] = expr + expr_keys = set(expr_dict.keys()) + resolved_link_dim = self._resolve_pwl_link_dim( + link_dim, breakpoints, dim, expr_keys + ) + computed_mask = self._compute_pwl_mask(mask, breakpoints, skip_nan_check) + lambda_mask = ( + computed_mask.any(dim=resolved_link_dim) + if computed_mask is not None + else None + ) + target_expr = self._build_stacked_expr( + expr_dict, breakpoints, resolved_link_dim + ) + + # Build coordinate lists excluding special dimensions + exclude_dims = {dim, resolved_link_dim} - {None} + extra_coords = [ + pd.Index(breakpoints.coords[d].values, name=d) + for d in breakpoints.dims + if d not in exclude_dims + ] + lambda_coords = extra_coords + [ + pd.Index(breakpoints.coords[dim].values, name=dim) + ] + + # --- Dispatch to formulation --- + if method == "sos2": + return self._add_pwl_sos2( + name, breakpoints, dim, target_expr, lambda_coords, lambda_mask + ) + else: + return self._add_pwl_incremental( + name, + breakpoints, + dim, + target_expr, + extra_coords, + computed_mask, + resolved_link_dim, + ) + + def add_disjunctive_piecewise_constraints( + self, + expr: Variable | LinearExpression | dict[str, Variable | LinearExpression], + breakpoints: DataArray, + link_dim: str | None = None, + dim: str = DEFAULT_BREAKPOINT_DIM, + segment_dim: str = DEFAULT_SEGMENT_DIM, + mask: DataArray | None = None, + name: str | None = None, + skip_nan_check: bool = False, + ) -> Constraint: + """ + Add a disjunctive piecewise linear constraint for disconnected segments. + + Unlike ``add_piecewise_constraints``, which models continuous piecewise + linear functions (all segments connected end-to-end), this method handles + **disconnected segments** (with gaps between them). The variable must lie + on exactly one segment, selected by binary indicator variables. + + Uses the disaggregated convex combination formulation (no big-M needed, + tight LP relaxation): + + 1. Binary ``y_k ∈ {0,1}`` per segment, ``Σ y_k = 1`` + 2. Lambda ``λ_{k,i} ∈ [0,1]`` per breakpoint in each segment + 3. Convexity: ``Σ_i λ_{k,i} = y_k`` + 4. SOS2 within each segment (along breakpoint dim) + 5. Linking: ``expr = Σ_k Σ_i λ_{k,i} × bp_{k,i}`` + + Parameters + ---------- + expr : Variable, LinearExpression, or dict of these + The variable(s) or expression(s) to be linked by the piecewise + constraint. + breakpoints : xr.DataArray + Breakpoint values with at least ``dim`` and ``segment_dim`` + dimensions. Each slice along ``segment_dim`` defines one segment. + Use NaN to pad segments with fewer breakpoints. + link_dim : str, optional + Dimension in breakpoints linking to different expressions (dict + case). Auto-detected if None. + dim : str, default "breakpoint" + Dimension for breakpoint indices within each segment. + Must have numeric coordinates. + segment_dim : str, default "segment" + Dimension indexing the segments. + mask : xr.DataArray, optional + Boolean mask. If None, auto-detected from NaN values. + name : str, optional + Base name for generated variables/constraints. Auto-generated + if None using the shared ``_pwlCounter``. + skip_nan_check : bool, default False + If True, skip NaN detection in breakpoints. + + Returns + ------- + Constraint + The selection constraint (``Σ y_k = 1``). + + Raises + ------ + ValueError + If ``dim`` or ``segment_dim`` not in breakpoints dimensions. + If ``dim == segment_dim``. + If ``dim`` coordinates are not numeric. + If ``expr`` is not a Variable, LinearExpression, or dict. + + Examples + -------- + Two disconnected segments [0,10] and [50,100]: + + >>> m = Model() + >>> x = m.add_variables(name="x") + >>> breakpoints = xr.DataArray( + ... [[0, 10], [50, 100]], + ... dims=["segment", "breakpoint"], + ... coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ... ) + >>> _ = m.add_disjunctive_piecewise_constraints(x, breakpoints) + """ + # --- Input validation --- + if dim not in breakpoints.dims: + raise ValueError( + f"breakpoints must have dimension '{dim}', " + f"but only has dimensions {list(breakpoints.dims)}" + ) + if segment_dim not in breakpoints.dims: + raise ValueError( + f"breakpoints must have dimension '{segment_dim}', " + f"but only has dimensions {list(breakpoints.dims)}" + ) + if dim == segment_dim: + raise ValueError(f"dim and segment_dim must be different, both are '{dim}'") + if not pd.api.types.is_numeric_dtype(breakpoints.coords[dim]): + raise ValueError( + f"Breakpoint dimension '{dim}' must have numeric coordinates " + f"for SOS2 weights, but got {breakpoints.coords[dim].dtype}" + ) + + # --- Generate name using shared counter --- + if name is None: + name = f"pwl{self._pwlCounter}" + self._pwlCounter += 1 + + # --- Determine target expression --- + is_single = isinstance(expr, Variable | LinearExpression) + is_dict = isinstance(expr, dict) + + if not is_single and not is_dict: + raise ValueError( + f"'expr' must be a Variable, LinearExpression, or dict of these, " + f"got {type(expr)}" + ) + + if is_single: + assert isinstance(expr, Variable | LinearExpression) + target_expr = self._to_linexpr(expr) + resolved_link_dim = None + computed_mask = self._compute_pwl_mask(mask, breakpoints, skip_nan_check) + lambda_mask = computed_mask + else: + assert isinstance(expr, dict) + expr_dict: dict[str, Variable | LinearExpression] = expr + expr_keys = set(expr_dict.keys()) + resolved_link_dim = self._resolve_pwl_link_dim( + link_dim, + breakpoints, + dim, + expr_keys, + exclude_dims={dim, segment_dim}, + ) + computed_mask = self._compute_pwl_mask(mask, breakpoints, skip_nan_check) + lambda_mask = ( + computed_mask.any(dim=resolved_link_dim) + if computed_mask is not None + else None + ) + target_expr = self._build_stacked_expr( + expr_dict, breakpoints, resolved_link_dim + ) + + # Build coordinate lists excluding special dimensions + exclude_dims_set = {dim, segment_dim, resolved_link_dim} - {None} + extra_coords = [ + pd.Index(breakpoints.coords[d].values, name=d) + for d in breakpoints.dims + if d not in exclude_dims_set + ] + lambda_coords = extra_coords + [ + pd.Index(breakpoints.coords[segment_dim].values, name=segment_dim), + pd.Index(breakpoints.coords[dim].values, name=dim), + ] + binary_coords = extra_coords + [ + pd.Index(breakpoints.coords[segment_dim].values, name=segment_dim), + ] + + # Binary mask: valid if any breakpoint in segment is valid + binary_mask = lambda_mask.any(dim=dim) if lambda_mask is not None else None + + return self._add_dpwl_sos2( + name, + breakpoints, + dim, + segment_dim, + target_expr, + lambda_coords, + lambda_mask, + binary_coords, + binary_mask, + ) + + def _add_pwl_sos2( + self, + name: str, + breakpoints: DataArray, + dim: str, + target_expr: LinearExpression, + lambda_coords: list[pd.Index], + lambda_mask: DataArray | None, + ) -> Constraint: + """Create piecewise linear constraint using SOS2 formulation.""" + lambda_name = f"{name}{PWL_LAMBDA_SUFFIX}" + convex_name = f"{name}{PWL_CONVEX_SUFFIX}" + link_name = f"{name}{PWL_LINK_SUFFIX}" + + lambda_var = self.add_variables( + lower=0, upper=1, coords=lambda_coords, name=lambda_name, mask=lambda_mask + ) + + self.add_sos_constraints(lambda_var, sos_type=2, sos_dim=dim) + + convex_con = self.add_constraints( + lambda_var.sum(dim=dim) == 1, name=convex_name + ) + + weighted_sum = (lambda_var * breakpoints).sum(dim=dim) + self.add_constraints(target_expr == weighted_sum, name=link_name) + + return convex_con + + def _add_pwl_incremental( + self, + name: str, + breakpoints: DataArray, + dim: str, + target_expr: LinearExpression, + extra_coords: list[pd.Index], + mask: DataArray | None, + link_dim: str | None, + ) -> Constraint: + """ + Create piecewise linear constraint using incremental formulation. + + For strictly monotonic breakpoints bp₀, bp₁, ..., bpₙ: + - Delta variables δᵢ ∈ [0, 1] for i = 1, ..., n + - Filling-order: δᵢ₊₁ ≤ δᵢ + - Link: expr = bp₀ + Σᵢ δᵢ × (bpᵢ - bpᵢ₋₁) + """ + if not self._check_strict_monotonicity(breakpoints, dim): + raise ValueError( + "Incremental method requires strictly monotonic breakpoints " + "along the breakpoint dimension." + ) + + delta_name = f"{name}{PWL_DELTA_SUFFIX}" + fill_name = f"{name}{PWL_FILL_SUFFIX}" + link_name = f"{name}{PWL_LINK_SUFFIX}" + + n_segments = breakpoints.sizes[dim] - 1 + seg_dim = f"{dim}_seg" + seg_index = pd.Index(range(n_segments), name=seg_dim) + delta_coords = extra_coords + [seg_index] + + # Compute step sizes: bp[i+1] - bp[i] for each segment + steps = breakpoints.diff(dim).rename({dim: seg_dim}) + steps[seg_dim] = seg_index + + # Compute delta mask from breakpoints mask + if mask is not None: + bp_mask = mask + if link_dim is not None: + bp_mask = bp_mask.all(dim=link_dim) + # Segment valid if both adjacent breakpoints valid + mask_lo = bp_mask.isel({dim: slice(None, -1)}).rename({dim: seg_dim}) + mask_hi = bp_mask.isel({dim: slice(1, None)}).rename({dim: seg_dim}) + mask_lo[seg_dim] = seg_index + mask_hi[seg_dim] = seg_index + delta_mask: DataArray | None = mask_lo & mask_hi + else: + delta_mask = None + + # Create delta variables δᵢ ∈ [0, 1] + delta_var = self.add_variables( + lower=0, upper=1, coords=delta_coords, name=delta_name, mask=delta_mask + ) + + # Filling-order constraints: δ[i+1] ≤ δ[i] (vectorized) + fill_con: Constraint | None = None + if n_segments >= 2: + # Slice adjacent pairs and drop coords so they align + delta_lo = delta_var.isel({seg_dim: slice(None, -1)}, drop=True) + delta_hi = delta_var.isel({seg_dim: slice(1, None)}, drop=True) + fill_con = self.add_constraints(delta_hi <= delta_lo, name=fill_name) + + # Linking constraint: expr = bp₀ + Σᵢ δᵢ × step_i + bp0 = breakpoints.isel({dim: 0}) + weighted_sum = (delta_var * steps).sum(dim=seg_dim) + bp0 + link_con = self.add_constraints(target_expr == weighted_sum, name=link_name) + + return fill_con if fill_con is not None else link_con + + def _add_dpwl_sos2( + self, + name: str, + breakpoints: DataArray, + dim: str, + segment_dim: str, + target_expr: LinearExpression, + lambda_coords: list[pd.Index], + lambda_mask: DataArray | None, + binary_coords: list[pd.Index], + binary_mask: DataArray | None, + ) -> Constraint: + """ + Create disjunctive piecewise linear constraint using disaggregated + convex combination. + + Each segment gets a binary indicator y_k and per-breakpoint lambdas. + When y_k=1, standard SOS2 interpolation applies within that segment. + When y_k=0, all lambdas for that segment are forced to zero. + """ + binary_name = f"{name}{PWL_BINARY_SUFFIX}" + select_name = f"{name}{PWL_SELECT_SUFFIX}" + lambda_name = f"{name}{PWL_LAMBDA_SUFFIX}" + convex_name = f"{name}{PWL_CONVEX_SUFFIX}" + link_name = f"{name}{PWL_LINK_SUFFIX}" + + # 1. Binary variables y_k ∈ {0,1}, one per segment + binary_var = self.add_variables( + binary=True, coords=binary_coords, name=binary_name, mask=binary_mask + ) + + # 2. Selection constraint: Σ y_k = 1 + select_con = self.add_constraints( + binary_var.sum(dim=segment_dim) == 1, name=select_name + ) + + # 3. Lambda variables λ_{k,i} ∈ [0,1], per segment per breakpoint + lambda_var = self.add_variables( + lower=0, upper=1, coords=lambda_coords, name=lambda_name, mask=lambda_mask + ) + + # 4. SOS2 within each segment (along breakpoint dim) + self.add_sos_constraints(lambda_var, sos_type=2, sos_dim=dim) + + # 5. Convexity: Σ_i λ_{k,i} = y_k (lambdas sum to binary indicator) + self.add_constraints(lambda_var.sum(dim=dim) == binary_var, name=convex_name) + + # 6. Linking: expr = Σ_k Σ_i λ_{k,i} × bp_{k,i} + weighted_sum = (lambda_var * breakpoints).sum(dim=[segment_dim, dim]) + self.add_constraints(target_expr == weighted_sum, name=link_name) + + return select_con + + @staticmethod + def _check_strict_monotonicity(breakpoints: DataArray, dim: str) -> bool: + """ + Check if breakpoints are strictly monotonic along dim. + + Each slice along non-dim dimensions is checked independently, + allowing different slices to have opposite directions (e.g., one + increasing and another decreasing). NaN values are ignored. + """ + diffs = breakpoints.diff(dim) + # Per-slice along dim: all non-NaN diffs positive, or all negative + pos = (diffs > 0) | diffs.isnull() + neg = (diffs < 0) | diffs.isnull() + all_pos_per_slice = pos.all(dim) + all_neg_per_slice = neg.all(dim) + # Each slice must be one or the other (and have at least one non-NaN) + has_non_nan = (~diffs.isnull()).any(dim) + monotonic = (all_pos_per_slice | all_neg_per_slice) & has_non_nan + return bool(monotonic.all()) + + def _to_linexpr(self, expr: Variable | LinearExpression) -> LinearExpression: + """Convert Variable or LinearExpression to LinearExpression.""" + if isinstance(expr, LinearExpression): + return expr + return expr.to_linexpr() + + def _compute_pwl_mask( + self, + mask: DataArray | None, + breakpoints: DataArray, + skip_nan_check: bool, + ) -> DataArray | None: + """Compute mask for piecewise constraint, optionally skipping NaN check.""" + if mask is not None: + return mask + if skip_nan_check: + return None + return ~breakpoints.isnull() + + def _resolve_pwl_link_dim( + self, + link_dim: str | None, + breakpoints: DataArray, + dim: str, + expr_keys: set[str], + exclude_dims: set[str] | None = None, + ) -> str: + """Auto-detect or validate link_dim for dict case.""" + if exclude_dims is None: + exclude_dims = {dim} + if link_dim is None: + for d in breakpoints.dims: + if d in exclude_dims: + continue + coords_set = set(str(c) for c in breakpoints.coords[d].values) + if coords_set == expr_keys: + return str(d) + raise ValueError( + "Could not auto-detect link_dim. Please specify it explicitly. " + f"Breakpoint dimensions: {list(breakpoints.dims)}, " + f"expression keys: {list(expr_keys)}" + ) + + if link_dim not in breakpoints.dims: + raise ValueError( + f"link_dim '{link_dim}' not found in breakpoints dimensions " + f"{list(breakpoints.dims)}" + ) + coords_set = set(str(c) for c in breakpoints.coords[link_dim].values) + if coords_set != expr_keys: + raise ValueError( + f"link_dim '{link_dim}' coordinates {coords_set} " + f"don't match expression keys {expr_keys}" + ) + return link_dim + + def _build_stacked_expr( + self, + expr_dict: dict[str, Variable | LinearExpression], + breakpoints: DataArray, + link_dim: str, + ) -> LinearExpression: + """Build a stacked LinearExpression from a dict of Variables/Expressions.""" + link_coords = list(breakpoints.coords[link_dim].values) + + # Collect expression data and stack + expr_data_list = [] + for k in link_coords: + e = expr_dict[str(k)] + linexpr = self._to_linexpr(e) + expr_data_list.append(linexpr.data.expand_dims({link_dim: [k]})) + + # Concatenate along link_dim + stacked_data = xr.concat(expr_data_list, dim=link_dim) + return LinearExpression(stacked_data, self) + def add_constraints( self, lhs: VariableLike diff --git a/test/test_piecewise_constraints.py b/test/test_piecewise_constraints.py new file mode 100644 index 00000000..e3a3a838 --- /dev/null +++ b/test/test_piecewise_constraints.py @@ -0,0 +1,1633 @@ +"""Tests for piecewise linear constraints.""" + +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest +import xarray as xr + +from linopy import Model, available_solvers +from linopy.constants import ( + PWL_BINARY_SUFFIX, + PWL_CONVEX_SUFFIX, + PWL_DELTA_SUFFIX, + PWL_FILL_SUFFIX, + PWL_LAMBDA_SUFFIX, + PWL_LINK_SUFFIX, + PWL_SELECT_SUFFIX, +) + + +class TestBasicSingleVariable: + """Tests for single variable piecewise constraints.""" + + def test_basic_single_variable(self) -> None: + """Test basic piecewise constraint with a single variable.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [0, 10, 50, 100], dims=["bp"], coords={"bp": [0, 1, 2, 3]} + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + # Check lambda variables were created + assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables + + # Check constraints were created + assert f"pwl0{PWL_CONVEX_SUFFIX}" in m.constraints + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + # Check SOS2 constraint was added + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert lambda_var.attrs.get("sos_type") == 2 + assert lambda_var.attrs.get("sos_dim") == "bp" + + def test_single_variable_with_coords(self) -> None: + """Test piecewise constraint with a variable that has coordinates.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + x = m.add_variables(coords=[generators], name="x") + + bp_coords = [0, 1, 2] + breakpoints = xr.DataArray( + [[0, 50, 100], [0, 30, 80]], + dims=["generator", "bp"], + coords={"generator": generators, "bp": bp_coords}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + # Lambda should have both generator and bp dimensions + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert "generator" in lambda_var.dims + assert "bp" in lambda_var.dims + + +class TestDictOfVariables: + """Tests for dict of variables (multiple linked variables).""" + + def test_dict_of_variables(self) -> None: + """Test piecewise constraint with multiple linked variables.""" + m = Model() + power = m.add_variables(name="power") + efficiency = m.add_variables(name="efficiency") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0.8, 0.95, 0.9]], + dims=["var", "bp"], + coords={"var": ["power", "efficiency"], "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + link_dim="var", + dim="bp", + ) + + # Check single linking constraint was created for all variables + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_dict_with_coordinates(self) -> None: + """Test dict of variables with additional coordinates.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + power = m.add_variables(coords=[generators], name="power") + efficiency = m.add_variables(coords=[generators], name="efficiency") + + breakpoints = xr.DataArray( + [[[0, 50, 100], [0.8, 0.95, 0.9]], [[0, 30, 80], [0.75, 0.9, 0.85]]], + dims=["generator", "var", "bp"], + coords={ + "generator": generators, + "var": ["power", "efficiency"], + "bp": [0, 1, 2], + }, + ) + + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + link_dim="var", + dim="bp", + ) + + # Lambda should have generator and bp dimensions (not var) + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert "generator" in lambda_var.dims + assert "bp" in lambda_var.dims + assert "var" not in lambda_var.dims + + +class TestAutoDetectLinkDim: + """Tests for auto-detection of link_dim.""" + + def test_auto_detect_link_dim(self) -> None: + """Test that link_dim is auto-detected from breakpoints.""" + m = Model() + power = m.add_variables(name="power") + efficiency = m.add_variables(name="efficiency") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0.8, 0.95, 0.9]], + dims=["var", "bp"], + coords={"var": ["power", "efficiency"], "bp": [0, 1, 2]}, + ) + + # Should auto-detect link_dim="var" + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + dim="bp", + ) + + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_auto_detect_fails_with_no_match(self) -> None: + """Test that auto-detection fails when no dimension matches keys.""" + m = Model() + power = m.add_variables(name="power") + efficiency = m.add_variables(name="efficiency") + + # Dimension 'wrong' doesn't match variable keys + breakpoints = xr.DataArray( + [[0, 50, 100], [0.8, 0.95, 0.9]], + dims=["wrong", "bp"], + coords={"wrong": ["a", "b"], "bp": [0, 1, 2]}, + ) + + with pytest.raises(ValueError, match="Could not auto-detect link_dim"): + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + dim="bp", + ) + + +class TestMasking: + """Tests for masking functionality.""" + + def test_nan_masking(self) -> None: + """Test that NaN values in breakpoints create masked constraints.""" + m = Model() + x = m.add_variables(name="x") + + # Third breakpoint is NaN + breakpoints = xr.DataArray( + [0, 10, np.nan, 100], + dims=["bp"], + coords={"bp": [0, 1, 2, 3]}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + # Lambda for NaN breakpoint should be masked + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + # Check that at least some labels are valid + assert (lambda_var.labels != -1).any() + + def test_explicit_mask(self) -> None: + """Test user-provided mask.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + x = m.add_variables(coords=[generators], name="x") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0, 30, 80]], + dims=["generator", "bp"], + coords={"generator": generators, "bp": [0, 1, 2]}, + ) + + # Mask out gen2 + mask = xr.DataArray( + [[True, True, True], [False, False, False]], + dims=["generator", "bp"], + coords={"generator": generators, "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp", mask=mask) + + # Should still create variables and constraints + assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables + + def test_skip_nan_check(self) -> None: + """Test skip_nan_check parameter for performance.""" + m = Model() + x = m.add_variables(name="x") + + # Breakpoints with no NaNs + breakpoints = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + + # Should work with skip_nan_check=True + m.add_piecewise_constraints(x, breakpoints, dim="bp", skip_nan_check=True) + + # All lambda variables should be valid (no masking) + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert (lambda_var.labels != -1).all() + + +class TestMultiDimensional: + """Tests for multi-dimensional piecewise constraints.""" + + def test_multi_dimensional(self) -> None: + """Test piecewise constraint with multiple loop dimensions.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + timesteps = pd.Index([0, 1, 2], name="time") + x = m.add_variables(coords=[generators, timesteps], name="x") + + rng = np.random.default_rng(42) + breakpoints = xr.DataArray( + rng.random((2, 3, 4)) * 100, + dims=["generator", "time", "bp"], + coords={"generator": generators, "time": timesteps, "bp": [0, 1, 2, 3]}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + # Lambda should have all dimensions + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert "generator" in lambda_var.dims + assert "time" in lambda_var.dims + assert "bp" in lambda_var.dims + + +class TestValidationErrors: + """Tests for input validation.""" + + def test_invalid_vars_type(self) -> None: + """Test error when expr is not Variable, LinearExpression, or dict.""" + m = Model() + + breakpoints = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + + with pytest.raises( + ValueError, match="must be a Variable, LinearExpression, or dict" + ): + m.add_piecewise_constraints("invalid", breakpoints, dim="bp") # type: ignore + + def test_missing_dim(self) -> None: + """Test error when breakpoints don't have the required dim.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray([0, 10, 50], dims=["wrong"]) + + with pytest.raises(ValueError, match="must have dimension"): + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + def test_non_numeric_dim(self) -> None: + """Test error when dim coordinates are not numeric.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [0, 10, 50], + dims=["bp"], + coords={"bp": ["a", "b", "c"]}, # Non-numeric + ) + + with pytest.raises(ValueError, match="numeric coordinates"): + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + def test_expression_support(self) -> None: + """Test that LinearExpression is supported as input.""" + m = Model() + x = m.add_variables(name="x") + y = m.add_variables(name="y") + + breakpoints = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + + # Should work with a LinearExpression + m.add_piecewise_constraints(x + y, breakpoints, dim="bp") + + # Check constraints were created + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_link_dim_not_in_breakpoints(self) -> None: + """Test error when link_dim is not in breakpoints.""" + m = Model() + power = m.add_variables(name="power") + efficiency = m.add_variables(name="efficiency") + + breakpoints = xr.DataArray([0, 50, 100], dims=["bp"], coords={"bp": [0, 1, 2]}) + + with pytest.raises(ValueError, match="not found in breakpoints dimensions"): + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + link_dim="var", + dim="bp", + ) + + def test_link_dim_coords_mismatch(self) -> None: + """Test error when link_dim coords don't match dict keys.""" + m = Model() + power = m.add_variables(name="power") + efficiency = m.add_variables(name="efficiency") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0.8, 0.95, 0.9]], + dims=["var", "bp"], + coords={"var": ["wrong1", "wrong2"], "bp": [0, 1, 2]}, + ) + + with pytest.raises(ValueError, match="don't match expression keys"): + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + link_dim="var", + dim="bp", + ) + + +class TestNameGeneration: + """Tests for automatic name generation.""" + + def test_auto_name_generation(self) -> None: + """Test that names are auto-generated correctly.""" + m = Model() + x = m.add_variables(name="x") + y = m.add_variables(name="y") + + bp1 = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + bp2 = xr.DataArray([0, 20, 80], dims=["bp"], coords={"bp": [0, 1, 2]}) + + m.add_piecewise_constraints(x, bp1, dim="bp") + m.add_piecewise_constraints(y, bp2, dim="bp") + + assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables + assert f"pwl1{PWL_LAMBDA_SUFFIX}" in m.variables + + def test_custom_name(self) -> None: + """Test using a custom name.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + + m.add_piecewise_constraints(x, breakpoints, dim="bp", name="my_pwl") + + assert f"my_pwl{PWL_LAMBDA_SUFFIX}" in m.variables + assert f"my_pwl{PWL_CONVEX_SUFFIX}" in m.constraints + assert f"my_pwl{PWL_LINK_SUFFIX}" in m.constraints + + +class TestLPFileOutput: + """Tests for LP file output with piecewise constraints.""" + + def test_piecewise_written_to_lp(self, tmp_path: Path) -> None: + """Test that piecewise constraints are properly written to LP file.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [0.0, 10.0, 50.0], + dims=["bp"], + coords={"bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + # Add a simple objective to make it a valid LP + m.add_objective(x) + + fn = tmp_path / "pwl.lp" + m.to_file(fn, io_api="lp") + content = fn.read_text() + + # Should contain SOS2 section + assert "\nsos\n" in content.lower() + assert "s2" in content.lower() + + +@pytest.mark.skipif("gurobi" not in available_solvers, reason="Gurobi not installed") +class TestSolverIntegration: + """Integration tests with Gurobi solver.""" + + def test_solve_single_variable(self) -> None: + """Test solving a model with piecewise constraint.""" + gurobipy = pytest.importorskip("gurobipy") + + m = Model() + # Variable that should be between 0 and 100 + x = m.add_variables(lower=0, upper=100, name="x") + + # Piecewise linear cost function: cost = f(x) + # f(0) = 0, f(50) = 10, f(100) = 50 + cost = m.add_variables(name="cost") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0, 10, 50]], + dims=["var", "bp"], + coords={"var": ["x", "cost"], "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints( + {"x": x, "cost": cost}, breakpoints, link_dim="var", dim="bp" + ) + + # Minimize cost, but need x >= 50 to make it interesting + m.add_constraints(x >= 50, name="x_min") + m.add_objective(cost) + + try: + status, cond = m.solve(solver_name="gurobi", io_api="direct") + except gurobipy.GurobiError as exc: + pytest.skip(f"Gurobi environment unavailable: {exc}") + + assert status == "ok" + # At x=50, cost should be 10 + assert np.isclose(x.solution.values, 50, atol=1e-5) + assert np.isclose(cost.solution.values, 10, atol=1e-5) + + def test_solve_efficiency_curve(self) -> None: + """Test solving with a realistic efficiency curve.""" + gurobipy = pytest.importorskip("gurobipy") + + m = Model() + power = m.add_variables(lower=0, upper=100, name="power") + efficiency = m.add_variables(name="efficiency") + + # Efficiency curve: starts low, peaks, then decreases + # power: 0 25 50 75 100 + # efficiency: 0.7 0.85 0.95 0.9 0.8 + breakpoints = xr.DataArray( + [[0, 25, 50, 75, 100], [0.7, 0.85, 0.95, 0.9, 0.8]], + dims=["var", "bp"], + coords={"var": ["power", "efficiency"], "bp": [0, 1, 2, 3, 4]}, + ) + + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + link_dim="var", + dim="bp", + ) + + # Maximize efficiency + m.add_objective(efficiency, sense="max") + + try: + status, cond = m.solve(solver_name="gurobi", io_api="direct") + except gurobipy.GurobiError as exc: + pytest.skip(f"Gurobi environment unavailable: {exc}") + + assert status == "ok" + # Maximum efficiency is at power=50 + assert np.isclose(power.solution.values, 50, atol=1e-5) + assert np.isclose(efficiency.solution.values, 0.95, atol=1e-5) + + def test_solve_multi_generator(self) -> None: + """Test with multiple generators each with different curves.""" + gurobipy = pytest.importorskip("gurobipy") + + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + power = m.add_variables(lower=0, upper=100, coords=[generators], name="power") + cost = m.add_variables(coords=[generators], name="cost") + + # Different cost curves for each generator + # gen1: cheaper at low power, expensive at high + # gen2: more expensive at low power, cheaper at high + breakpoints = xr.DataArray( + [ + [[0, 50, 100], [0, 5, 30]], # gen1: power, cost + [[0, 50, 100], [0, 15, 20]], # gen2: power, cost + ], + dims=["generator", "var", "bp"], + coords={ + "generator": generators, + "var": ["power", "cost"], + "bp": [0, 1, 2], + }, + ) + + m.add_piecewise_constraints( + {"power": power, "cost": cost}, breakpoints, link_dim="var", dim="bp" + ) + + # Need total power of 120 + m.add_constraints(power.sum() >= 120, name="demand") + + # Minimize total cost + m.add_objective(cost.sum()) + + try: + status, cond = m.solve(solver_name="gurobi", io_api="direct") + except gurobipy.GurobiError as exc: + pytest.skip(f"Gurobi environment unavailable: {exc}") + + assert status == "ok" + # gen1 should provide ~50 (cheap up to 50), gen2 provides rest + total_power = power.solution.sum().values + assert np.isclose(total_power, 120, atol=1e-5) + + +class TestIncrementalFormulation: + """Tests for the incremental (delta) piecewise formulation.""" + + def test_single_variable_incremental(self) -> None: + """Test incremental formulation with a single variable.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [0, 10, 50, 100], dims=["bp"], coords={"bp": [0, 1, 2, 3]} + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp", method="incremental") + + # Check delta variables created + assert f"pwl0{PWL_DELTA_SUFFIX}" in m.variables + # 3 segments → 3 delta vars + delta_var = m.variables[f"pwl0{PWL_DELTA_SUFFIX}"] + assert "bp_seg" in delta_var.dims + assert len(delta_var.coords["bp_seg"]) == 3 + + # Check filling-order constraint (single vectorized constraint) + assert f"pwl0{PWL_FILL_SUFFIX}" in m.constraints + + # Check link constraint + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + # No SOS2 or lambda variables + assert f"pwl0{PWL_LAMBDA_SUFFIX}" not in m.variables + + def test_two_breakpoints_incremental(self) -> None: + """Test incremental with only 2 breakpoints (1 segment, no fill constraints).""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray([0, 100], dims=["bp"], coords={"bp": [0, 1]}) + + m.add_piecewise_constraints(x, breakpoints, dim="bp", method="incremental") + + # 1 segment → 1 delta var, no filling constraints + delta_var = m.variables[f"pwl0{PWL_DELTA_SUFFIX}"] + assert len(delta_var.coords["bp_seg"]) == 1 + + # Link constraint should exist + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_dict_incremental(self) -> None: + """Test incremental formulation with dict of variables.""" + m = Model() + power = m.add_variables(name="power") + cost = m.add_variables(name="cost") + + # Both power and cost breakpoints are strictly increasing + breakpoints = xr.DataArray( + [[0, 50, 100], [0, 10, 50]], + dims=["var", "bp"], + coords={"var": ["power", "cost"], "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints( + {"power": power, "cost": cost}, + breakpoints, + link_dim="var", + dim="bp", + method="incremental", + ) + + assert f"pwl0{PWL_DELTA_SUFFIX}" in m.variables + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_non_monotonic_raises(self) -> None: + """Test that non-monotonic breakpoints raise ValueError for incremental.""" + m = Model() + x = m.add_variables(name="x") + + # Not monotonic: 0, 50, 30 + breakpoints = xr.DataArray([0, 50, 30], dims=["bp"], coords={"bp": [0, 1, 2]}) + + with pytest.raises(ValueError, match="strictly monotonic"): + m.add_piecewise_constraints(x, breakpoints, dim="bp", method="incremental") + + def test_decreasing_monotonic_works(self) -> None: + """Test that strictly decreasing breakpoints work for incremental.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [100, 50, 10, 0], dims=["bp"], coords={"bp": [0, 1, 2, 3]} + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp", method="incremental") + assert f"pwl0{PWL_DELTA_SUFFIX}" in m.variables + + def test_opposite_directions_in_dict(self) -> None: + """Test that dict with opposite monotonic directions works.""" + m = Model() + power = m.add_variables(name="power") + eff = m.add_variables(name="eff") + + # power increasing, efficiency decreasing + breakpoints = xr.DataArray( + [[0, 50, 100], [0.95, 0.9, 0.8]], + dims=["var", "bp"], + coords={"var": ["power", "eff"], "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints( + {"power": power, "eff": eff}, + breakpoints, + link_dim="var", + dim="bp", + method="incremental", + ) + + assert f"pwl0{PWL_DELTA_SUFFIX}" in m.variables + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_nan_breakpoints_monotonic(self) -> None: + """Test that NaN breakpoints don't break monotonicity check.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [0, 10, np.nan, 100], dims=["bp"], coords={"bp": [0, 1, 2, 3]} + ) + + # Should detect as monotonic despite NaN + m.add_piecewise_constraints(x, breakpoints, dim="bp", method="auto") + assert f"pwl0{PWL_DELTA_SUFFIX}" in m.variables + + def test_auto_selects_incremental(self) -> None: + """Test method='auto' selects incremental for monotonic breakpoints.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [0, 10, 50, 100], dims=["bp"], coords={"bp": [0, 1, 2, 3]} + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp", method="auto") + + # Should use incremental (delta vars, no lambda) + assert f"pwl0{PWL_DELTA_SUFFIX}" in m.variables + assert f"pwl0{PWL_LAMBDA_SUFFIX}" not in m.variables + + def test_auto_selects_sos2(self) -> None: + """Test method='auto' falls back to sos2 for non-monotonic breakpoints.""" + m = Model() + x = m.add_variables(name="x") + + # Non-monotonic across the full array (dict case would have link_dim) + # For single expr, breakpoints along dim are [0, 50, 30] + breakpoints = xr.DataArray([0, 50, 30], dims=["bp"], coords={"bp": [0, 1, 2]}) + + m.add_piecewise_constraints(x, breakpoints, dim="bp", method="auto") + + # Should use sos2 (lambda vars, no delta) + assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables + assert f"pwl0{PWL_DELTA_SUFFIX}" not in m.variables + + def test_invalid_method_raises(self) -> None: + """Test that an invalid method raises ValueError.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + + with pytest.raises(ValueError, match="method must be"): + m.add_piecewise_constraints(x, breakpoints, dim="bp", method="invalid") # type: ignore[arg-type] + + def test_incremental_with_coords(self) -> None: + """Test incremental formulation with extra coordinates.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + x = m.add_variables(coords=[generators], name="x") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0, 30, 80]], + dims=["generator", "bp"], + coords={"generator": generators, "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp", method="incremental") + + delta_var = m.variables[f"pwl0{PWL_DELTA_SUFFIX}"] + assert "generator" in delta_var.dims + assert "bp_seg" in delta_var.dims + + +@pytest.mark.skipif("gurobi" not in available_solvers, reason="Gurobi not installed") +class TestIncrementalSolverIntegration: + """Integration tests for incremental formulation with Gurobi.""" + + def test_solve_incremental_single(self) -> None: + """Test solving with incremental formulation.""" + gurobipy = pytest.importorskip("gurobipy") + + m = Model() + x = m.add_variables(lower=0, upper=100, name="x") + cost = m.add_variables(name="cost") + + # Monotonic breakpoints for both x and cost + breakpoints = xr.DataArray( + [[0, 50, 100], [0, 10, 50]], + dims=["var", "bp"], + coords={"var": ["x", "cost"], "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints( + {"x": x, "cost": cost}, + breakpoints, + link_dim="var", + dim="bp", + method="incremental", + ) + + m.add_constraints(x >= 50, name="x_min") + m.add_objective(cost) + + try: + status, cond = m.solve(solver_name="gurobi", io_api="direct") + except gurobipy.GurobiError as exc: + pytest.skip(f"Gurobi environment unavailable: {exc}") + + assert status == "ok" + assert np.isclose(x.solution.values, 50, atol=1e-5) + assert np.isclose(cost.solution.values, 10, atol=1e-5) + + +# ===== Disjunctive Piecewise Linear Constraint Tests ===== + + +class TestDisjunctiveBasicSingleVariable: + """Tests for single variable disjunctive piecewise constraints.""" + + def test_two_equal_segments(self) -> None: + """Test with two equal-length segments.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + # Binary variables created + assert f"pwl0{PWL_BINARY_SUFFIX}" in m.variables + # Selection constraint + assert f"pwl0{PWL_SELECT_SUFFIX}" in m.constraints + # Lambda variables + assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables + # Convexity constraint + assert f"pwl0{PWL_CONVEX_SUFFIX}" in m.constraints + # Link constraint + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + # SOS2 on lambda + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert lambda_var.attrs.get("sos_type") == 2 + assert lambda_var.attrs.get("sos_dim") == "breakpoint" + + def test_uneven_segments_with_nan(self) -> None: + """Test segments of different lengths with NaN padding.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 5, 10], [50, 100, np.nan]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1, 2]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + # Lambda for NaN breakpoint should be masked + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert "segment" in lambda_var.dims + assert "breakpoint" in lambda_var.dims + + def test_single_breakpoint_segment(self) -> None: + """Test with a segment that has only one valid breakpoint (point segment).""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 10], [42, np.nan]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + assert f"pwl0{PWL_BINARY_SUFFIX}" in m.variables + + def test_single_variable_with_coords(self) -> None: + """Test coordinates are preserved on binary and lambda variables.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + x = m.add_variables(coords=[generators], name="x") + + breakpoints = xr.DataArray( + [ + [[0, 10], [50, 100]], + [[0, 20], [60, 90]], + ], + dims=["generator", "segment", "breakpoint"], + coords={ + "generator": generators, + "segment": [0, 1], + "breakpoint": [0, 1], + }, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + binary_var = m.variables[f"pwl0{PWL_BINARY_SUFFIX}"] + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + + # Both should preserve generator coordinates + assert list(binary_var.coords["generator"].values) == ["gen1", "gen2"] + assert list(lambda_var.coords["generator"].values) == ["gen1", "gen2"] + + # Binary has (generator, segment), lambda has (generator, segment, breakpoint) + assert set(binary_var.dims) == {"generator", "segment"} + assert set(lambda_var.dims) == {"generator", "segment", "breakpoint"} + + def test_return_value_is_selection_constraint(self) -> None: + """Test the return value is the selection constraint.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + result = m.add_disjunctive_piecewise_constraints(x, breakpoints) + + # Return value should be the selection constraint + assert result is not None + select_name = f"pwl0{PWL_SELECT_SUFFIX}" + assert select_name in m.constraints + + +class TestDisjunctiveDictOfVariables: + """Tests for dict of variables with disjunctive constraints.""" + + def test_dict_with_two_segments(self) -> None: + """Test dict of variables with two segments.""" + m = Model() + power = m.add_variables(name="power") + cost = m.add_variables(name="cost") + + breakpoints = xr.DataArray( + [[[0, 50], [0, 10]], [[80, 100], [20, 50]]], + dims=["segment", "var", "breakpoint"], + coords={ + "segment": [0, 1], + "var": ["power", "cost"], + "breakpoint": [0, 1], + }, + ) + + m.add_disjunctive_piecewise_constraints( + {"power": power, "cost": cost}, + breakpoints, + link_dim="var", + ) + + assert f"pwl0{PWL_BINARY_SUFFIX}" in m.variables + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_auto_detect_link_dim_with_segment_dim(self) -> None: + """Test auto-detection of link_dim when segment_dim is also present.""" + m = Model() + power = m.add_variables(name="power") + cost = m.add_variables(name="cost") + + breakpoints = xr.DataArray( + [[[0, 50], [0, 10]], [[80, 100], [20, 50]]], + dims=["segment", "var", "breakpoint"], + coords={ + "segment": [0, 1], + "var": ["power", "cost"], + "breakpoint": [0, 1], + }, + ) + + # Should auto-detect link_dim="var" (not segment) + m.add_disjunctive_piecewise_constraints( + {"power": power, "cost": cost}, + breakpoints, + ) + + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + +class TestDisjunctiveExtraDimensions: + """Tests for extra dimensions on disjunctive constraints.""" + + def test_extra_generator_dimension(self) -> None: + """Test with an extra generator dimension.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + x = m.add_variables(coords=[generators], name="x") + + breakpoints = xr.DataArray( + [ + [[0, 10], [50, 100]], + [[0, 20], [60, 90]], + ], + dims=["generator", "segment", "breakpoint"], + coords={ + "generator": generators, + "segment": [0, 1], + "breakpoint": [0, 1], + }, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + # Binary and lambda should have generator dimension + binary_var = m.variables[f"pwl0{PWL_BINARY_SUFFIX}"] + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert "generator" in binary_var.dims + assert "generator" in lambda_var.dims + assert "segment" in binary_var.dims + assert "segment" in lambda_var.dims + + def test_multi_dimensional_generator_time(self) -> None: + """Test variable with generator + time coords, verify all dims present.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + timesteps = pd.Index([0, 1, 2], name="time") + x = m.add_variables(coords=[generators, timesteps], name="x") + + rng = np.random.default_rng(42) + bp_data = rng.random((2, 3, 2, 2)) * 100 + # Sort breakpoints within each segment + bp_data = np.sort(bp_data, axis=-1) + + breakpoints = xr.DataArray( + bp_data, + dims=["generator", "time", "segment", "breakpoint"], + coords={ + "generator": generators, + "time": timesteps, + "segment": [0, 1], + "breakpoint": [0, 1], + }, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + binary_var = m.variables[f"pwl0{PWL_BINARY_SUFFIX}"] + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + + # All extra dims should be present + for dim_name in ["generator", "time", "segment"]: + assert dim_name in binary_var.dims + for dim_name in ["generator", "time", "segment", "breakpoint"]: + assert dim_name in lambda_var.dims + + def test_dict_with_additional_coords(self) -> None: + """Test dict of variables with extra generator dim, binary/lambda exclude link_dim.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + power = m.add_variables(coords=[generators], name="power") + cost = m.add_variables(coords=[generators], name="cost") + + breakpoints = xr.DataArray( + [ + [[[0, 50], [0, 10]], [[80, 100], [20, 30]]], + [[[0, 40], [0, 8]], [[70, 90], [15, 25]]], + ], + dims=["generator", "segment", "var", "breakpoint"], + coords={ + "generator": generators, + "segment": [0, 1], + "var": ["power", "cost"], + "breakpoint": [0, 1], + }, + ) + + m.add_disjunctive_piecewise_constraints( + {"power": power, "cost": cost}, + breakpoints, + link_dim="var", + ) + + binary_var = m.variables[f"pwl0{PWL_BINARY_SUFFIX}"] + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + + # link_dim (var) should NOT be in binary or lambda dims + assert "var" not in binary_var.dims + assert "var" not in lambda_var.dims + + # generator should be present + assert "generator" in binary_var.dims + assert "generator" in lambda_var.dims + + +class TestDisjunctiveMasking: + """Tests for masking functionality in disjunctive constraints.""" + + def test_nan_masking_labels(self) -> None: + """Test NaN breakpoints mask lambda labels to -1.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 5, 10], [50, 100, np.nan]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1, 2]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + # Segment 0: all 3 breakpoints valid (labels != -1) + seg0_labels = lambda_var.labels.sel(segment=0) + assert (seg0_labels != -1).all() + # Segment 1: breakpoint 2 is NaN → masked (label == -1) + seg1_bp2_label = lambda_var.labels.sel(segment=1, breakpoint=2) + assert int(seg1_bp2_label) == -1 + + # Binary: both segments have at least one valid breakpoint + binary_var = m.variables[f"pwl0{PWL_BINARY_SUFFIX}"] + assert (binary_var.labels != -1).all() + + def test_nan_masking_partial_segment(self) -> None: + """Test partial NaN — lambda masked but segment binary still valid.""" + m = Model() + x = m.add_variables(name="x") + + # Segment 0 has 3 valid breakpoints, segment 1 has 2 valid + 1 NaN + breakpoints = xr.DataArray( + [[0, 5, 10], [50, 100, np.nan]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1, 2]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + binary_var = m.variables[f"pwl0{PWL_BINARY_SUFFIX}"] + + # Segment 1 binary is still valid (has 2 valid breakpoints) + assert int(binary_var.labels.sel(segment=1)) != -1 + + # Segment 1 valid lambdas (breakpoint 0, 1) should be valid + assert int(lambda_var.labels.sel(segment=1, breakpoint=0)) != -1 + assert int(lambda_var.labels.sel(segment=1, breakpoint=1)) != -1 + + def test_explicit_mask(self) -> None: + """Test user-provided mask disables specific entries.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + # Mask out entire segment 1 + mask = xr.DataArray( + [[True, True], [False, False]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints, mask=mask) + + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + binary_var = m.variables[f"pwl0{PWL_BINARY_SUFFIX}"] + + # Segment 0 lambdas should be valid + assert (lambda_var.labels.sel(segment=0) != -1).all() + # Segment 1 lambdas should be masked + assert (lambda_var.labels.sel(segment=1) == -1).all() + # Segment 1 binary should be masked (no valid breakpoints) + assert int(binary_var.labels.sel(segment=1)) == -1 + + def test_skip_nan_check(self) -> None: + """Test skip_nan_check=True treats all breakpoints as valid.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 5, 10], [50, 100, np.nan]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1, 2]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints, skip_nan_check=True) + + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + # All labels should be valid (no masking) + assert (lambda_var.labels != -1).all() + + +class TestDisjunctiveValidationErrors: + """Tests for validation errors in disjunctive constraints.""" + + def test_missing_dim(self) -> None: + """Test error when breakpoints don't have dim.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "wrong"], + coords={"segment": [0, 1], "wrong": [0, 1]}, + ) + + with pytest.raises(ValueError, match="must have dimension"): + m.add_disjunctive_piecewise_constraints(x, breakpoints, dim="breakpoint") + + def test_missing_segment_dim(self) -> None: + """Test error when breakpoints don't have segment_dim.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [0, 10, 50], + dims=["breakpoint"], + coords={"breakpoint": [0, 1, 2]}, + ) + + with pytest.raises(ValueError, match="must have dimension"): + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + def test_same_dim_segment_dim(self) -> None: + """Test error when dim == segment_dim.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + with pytest.raises(ValueError, match="must be different"): + m.add_disjunctive_piecewise_constraints( + x, breakpoints, dim="segment", segment_dim="segment" + ) + + def test_non_numeric_coords(self) -> None: + """Test error when dim coordinates are not numeric.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": ["a", "b"]}, + ) + + with pytest.raises(ValueError, match="numeric coordinates"): + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + def test_invalid_expr(self) -> None: + """Test error when expr is invalid type.""" + m = Model() + + breakpoints = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + with pytest.raises( + ValueError, match="must be a Variable, LinearExpression, or dict" + ): + m.add_disjunctive_piecewise_constraints("invalid", breakpoints) # type: ignore + + def test_expression_support(self) -> None: + """Test that LinearExpression (x + y) works as input.""" + m = Model() + x = m.add_variables(name="x") + y = m.add_variables(name="y") + + breakpoints = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x + y, breakpoints) + + assert f"pwl0{PWL_BINARY_SUFFIX}" in m.variables + assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_link_dim_not_in_breakpoints(self) -> None: + """Test error when explicit link_dim not in breakpoints.""" + m = Model() + power = m.add_variables(name="power") + cost = m.add_variables(name="cost") + + breakpoints = xr.DataArray( + [[0, 50], [80, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + with pytest.raises(ValueError, match="not found in breakpoints dimensions"): + m.add_disjunctive_piecewise_constraints( + {"power": power, "cost": cost}, + breakpoints, + link_dim="var", + ) + + def test_link_dim_coords_mismatch(self) -> None: + """Test error when link_dim coords don't match dict keys.""" + m = Model() + power = m.add_variables(name="power") + cost = m.add_variables(name="cost") + + breakpoints = xr.DataArray( + [[[0, 50], [0, 10]], [[80, 100], [20, 30]]], + dims=["segment", "var", "breakpoint"], + coords={ + "segment": [0, 1], + "var": ["wrong1", "wrong2"], + "breakpoint": [0, 1], + }, + ) + + with pytest.raises(ValueError, match="don't match expression keys"): + m.add_disjunctive_piecewise_constraints( + {"power": power, "cost": cost}, + breakpoints, + link_dim="var", + ) + + +class TestDisjunctiveNameGeneration: + """Tests for name generation in disjunctive constraints.""" + + def test_shared_counter_with_continuous(self) -> None: + """Test that disjunctive and continuous PWL share the counter.""" + m = Model() + x = m.add_variables(name="x") + y = m.add_variables(name="y") + + bp_continuous = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + m.add_piecewise_constraints(x, bp_continuous, dim="bp") + + bp_disjunctive = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + m.add_disjunctive_piecewise_constraints(y, bp_disjunctive) + + # First is pwl0, second is pwl1 + assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables + assert f"pwl1{PWL_BINARY_SUFFIX}" in m.variables + + def test_custom_name(self) -> None: + """Test custom name for disjunctive constraints.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0, 10], [50, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints, name="my_dpwl") + + assert f"my_dpwl{PWL_BINARY_SUFFIX}" in m.variables + assert f"my_dpwl{PWL_SELECT_SUFFIX}" in m.constraints + assert f"my_dpwl{PWL_LAMBDA_SUFFIX}" in m.variables + assert f"my_dpwl{PWL_CONVEX_SUFFIX}" in m.constraints + assert f"my_dpwl{PWL_LINK_SUFFIX}" in m.constraints + + +class TestDisjunctiveLPFileOutput: + """Tests for LP file output with disjunctive piecewise constraints.""" + + def test_lp_contains_sos2_and_binary(self, tmp_path: Path) -> None: + """Test LP file contains SOS2 section and binary variables.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [[0.0, 10.0], [50.0, 100.0]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + m.add_objective(x) + + fn = tmp_path / "dpwl.lp" + m.to_file(fn, io_api="lp") + content = fn.read_text() + + # Should contain SOS2 section + assert "\nsos\n" in content.lower() + assert "s2" in content.lower() + + # Should contain binary section + assert "binary" in content.lower() or "binaries" in content.lower() + + +class TestDisjunctiveMultiBreakpointSegments: + """Tests for segments with multiple breakpoints (unique to disjunctive formulation).""" + + def test_three_breakpoints_per_segment(self) -> None: + """Test segments with 3 breakpoints each — verify lambda shape.""" + m = Model() + x = m.add_variables(name="x") + + # 2 segments, each with 3 breakpoints + breakpoints = xr.DataArray( + [[0, 5, 10], [50, 75, 100]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1, 2]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + # Lambda should have shape (2 segments, 3 breakpoints) + assert lambda_var.labels.sizes["segment"] == 2 + assert lambda_var.labels.sizes["breakpoint"] == 3 + # All labels valid (no NaN) + assert (lambda_var.labels != -1).all() + + def test_mixed_segment_lengths_nan_padding(self) -> None: + """Test one segment with 4 breakpoints, another with 2 (NaN-padded).""" + m = Model() + x = m.add_variables(name="x") + + # Segment 0: 4 valid breakpoints + # Segment 1: 2 valid breakpoints + 2 NaN + breakpoints = xr.DataArray( + [[0, 5, 10, 15], [50, 100, np.nan, np.nan]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1, 2, 3]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + binary_var = m.variables[f"pwl0{PWL_BINARY_SUFFIX}"] + + # Lambda shape: (2 segments, 4 breakpoints) + assert lambda_var.labels.sizes["segment"] == 2 + assert lambda_var.labels.sizes["breakpoint"] == 4 + + # Segment 0: all 4 lambdas valid + assert (lambda_var.labels.sel(segment=0) != -1).all() + + # Segment 1: first 2 valid, last 2 masked + assert (lambda_var.labels.sel(segment=1, breakpoint=0) != -1).item() + assert (lambda_var.labels.sel(segment=1, breakpoint=1) != -1).item() + assert (lambda_var.labels.sel(segment=1, breakpoint=2) == -1).item() + assert (lambda_var.labels.sel(segment=1, breakpoint=3) == -1).item() + + # Both segment binaries valid (both have at least one valid breakpoint) + assert (binary_var.labels != -1).all() + + +_disjunctive_solvers = [s for s in ["gurobi", "highs"] if s in available_solvers] + + +@pytest.mark.skipif( + len(_disjunctive_solvers) == 0, + reason="No supported solver (gurobi/highs) installed", +) +class TestDisjunctiveSolverIntegration: + """Integration tests for disjunctive piecewise constraints.""" + + @pytest.fixture(params=_disjunctive_solvers) + def solver_name(self, request: pytest.FixtureRequest) -> str: + return request.param + + def _solve(self, m: Model, solver_name: str) -> tuple: + """Solve model, skipping if solver environment is unavailable.""" + try: + if solver_name == "gurobi": + gurobipy = pytest.importorskip("gurobipy") + try: + return m.solve(solver_name="gurobi", io_api="direct") + except gurobipy.GurobiError as exc: + pytest.skip(f"Gurobi environment unavailable: {exc}") + else: + return m.solve(solver_name=solver_name) + except Exception as exc: + pytest.skip(f"Solver {solver_name} unavailable: {exc}") + + def test_minimize_picks_low_segment(self, solver_name: str) -> None: + """Test minimizing x picks the lower segment.""" + m = Model() + x = m.add_variables(name="x") + + # Two segments: [0, 10] and [50, 100] + breakpoints = xr.DataArray( + [[0.0, 10.0], [50.0, 100.0]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + m.add_objective(x) + + status, cond = self._solve(m, solver_name) + + assert status == "ok" + # Should pick x=0 (minimum of low segment) + assert np.isclose(x.solution.values, 0.0, atol=1e-5) + + def test_maximize_picks_high_segment(self, solver_name: str) -> None: + """Test maximizing x picks the upper segment.""" + m = Model() + x = m.add_variables(name="x") + + # Two segments: [0, 10] and [50, 100] + breakpoints = xr.DataArray( + [[0.0, 10.0], [50.0, 100.0]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + m.add_objective(x, sense="max") + + status, cond = self._solve(m, solver_name) + + assert status == "ok" + # Should pick x=100 (maximum of high segment) + assert np.isclose(x.solution.values, 100.0, atol=1e-5) + + def test_dict_case_solver(self, solver_name: str) -> None: + """Test disjunctive with dict of variables and solver.""" + m = Model() + power = m.add_variables(name="power") + cost = m.add_variables(name="cost") + + # Two operating regions: + # Region 0: power [0,50], cost [0,10] + # Region 1: power [80,100], cost [20,30] + breakpoints = xr.DataArray( + [[[0.0, 50.0], [0.0, 10.0]], [[80.0, 100.0], [20.0, 30.0]]], + dims=["segment", "var", "breakpoint"], + coords={ + "segment": [0, 1], + "var": ["power", "cost"], + "breakpoint": [0, 1], + }, + ) + + m.add_disjunctive_piecewise_constraints( + {"power": power, "cost": cost}, + breakpoints, + link_dim="var", + ) + + # Minimize cost + m.add_objective(cost) + + status, cond = self._solve(m, solver_name) + + assert status == "ok" + # Should pick region 0, minimum cost = 0 + assert np.isclose(cost.solution.values, 0.0, atol=1e-5) + assert np.isclose(power.solution.values, 0.0, atol=1e-5) + + def test_three_segments_min(self, solver_name: str) -> None: + """Test 3 segments, minimize picks lowest.""" + m = Model() + x = m.add_variables(name="x") + + # Three segments: [0, 10], [30, 50], [80, 100] + breakpoints = xr.DataArray( + [[0.0, 10.0], [30.0, 50.0], [80.0, 100.0]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1, 2], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + m.add_objective(x) + + status, cond = self._solve(m, solver_name) + + assert status == "ok" + assert np.isclose(x.solution.values, 0.0, atol=1e-5) + + def test_constrained_mid_segment(self, solver_name: str) -> None: + """Test constraint forcing x into middle of a segment, verify interpolation.""" + m = Model() + x = m.add_variables(name="x") + + # Two segments: [0, 10] and [50, 100] + breakpoints = xr.DataArray( + [[0.0, 10.0], [50.0, 100.0]], + dims=["segment", "breakpoint"], + coords={"segment": [0, 1], "breakpoint": [0, 1]}, + ) + + m.add_disjunctive_piecewise_constraints(x, breakpoints) + + # Force x >= 60, so must be in segment 1 + m.add_constraints(x >= 60, name="x_lower") + m.add_objective(x) + + status, cond = self._solve(m, solver_name) + + assert status == "ok" + # Minimum in segment 1 with x >= 60 → x = 60 + assert np.isclose(x.solution.values, 60.0, atol=1e-5) + + def test_multi_breakpoint_segment_solver(self, solver_name: str) -> None: + """Test segment with 3 breakpoints, verify correct interpolated value.""" + m = Model() + power = m.add_variables(name="power") + cost = m.add_variables(name="cost") + + # Both segments have 3 breakpoints (no NaN padding needed) + # Segment 0: 3-breakpoint curve (power [0,50,100], cost [0,10,50]) + # Segment 1: 3-breakpoint curve (power [200,250,300], cost [80,90,100]) + breakpoints = xr.DataArray( + [ + [[0.0, 50.0, 100.0], [0.0, 10.0, 50.0]], + [[200.0, 250.0, 300.0], [80.0, 90.0, 100.0]], + ], + dims=["segment", "var", "breakpoint"], + coords={ + "segment": [0, 1], + "var": ["power", "cost"], + "breakpoint": [0, 1, 2], + }, + ) + + m.add_disjunctive_piecewise_constraints( + {"power": power, "cost": cost}, + breakpoints, + link_dim="var", + ) + + # Constraint: power >= 50, minimize cost → picks segment 0, power=50, cost=10 + m.add_constraints(power >= 50, name="power_min") + m.add_constraints(power <= 150, name="power_max") + m.add_objective(cost) + + status, cond = self._solve(m, solver_name) + + assert status == "ok" + assert np.isclose(power.solution.values, 50.0, atol=1e-5) + assert np.isclose(cost.solution.values, 10.0, atol=1e-5) + + def test_multi_generator_solver(self, solver_name: str) -> None: + """Test multiple generators with different disjunctive segments.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + power = m.add_variables(lower=0, coords=[generators], name="power") + cost = m.add_variables(coords=[generators], name="cost") + + # gen1: two operating regions + # Region 0: power [0,50], cost [0,15] + # Region 1: power [80,100], cost [30,50] + # gen2: two operating regions + # Region 0: power [0,60], cost [0,10] + # Region 1: power [70,100], cost [12,40] + breakpoints = xr.DataArray( + [ + [[[0.0, 50.0], [0.0, 15.0]], [[80.0, 100.0], [30.0, 50.0]]], + [[[0.0, 60.0], [0.0, 10.0]], [[70.0, 100.0], [12.0, 40.0]]], + ], + dims=["generator", "segment", "var", "breakpoint"], + coords={ + "generator": generators, + "segment": [0, 1], + "var": ["power", "cost"], + "breakpoint": [0, 1], + }, + ) + + m.add_disjunctive_piecewise_constraints( + {"power": power, "cost": cost}, + breakpoints, + link_dim="var", + ) + + # Total power demand >= 100 + m.add_constraints(power.sum() >= 100, name="demand") + m.add_objective(cost.sum()) + + status, cond = self._solve(m, solver_name) + + assert status == "ok" + total_power = power.solution.sum().values + assert total_power >= 100 - 1e-5