diff --git a/.coverage b/.coverage
index 44c7c72..79b58f0 100644
Binary files a/.coverage and b/.coverage differ
diff --git a/README.md b/README.md
index 184e0cd..a0552b1 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
-
+
[](https://github.com/automl/promptolution/actions/workflows/ci.yml)
[](https://github.com/automl/promptolution/actions/workflows/docs.yml)

@@ -92,7 +92,7 @@ We encourage every contributor to also write tests, that automatically check if
```
poetry run python -m coverage run -m pytest
-poetry run python -m coverage report
+poetry run python -m coverage report -i
```
Developed by **Timo HeiΓ**, **Moritz Schlager**, and **Tom Zehle** (LMU Munich, MCML, ELLIS, TUM, Uni Freiburg).
diff --git a/docs/examples/getting_started.md b/docs/examples/getting_started.md
index 81e1f57..47f359d 100644
--- a/docs/examples/getting_started.md
+++ b/docs/examples/getting_started.md
@@ -114,7 +114,7 @@ With everything configured, you're ready to optimize your prompts! The `run_expe
prompts = run_experiment(df, config)
```
- π CAPO requires block evaluation strategy. Setting it to 'sequential_block'.
+ π CAPO-style optimizers require block evaluation strategy. Setting it to 'sequential_block'.
β οΈ The LLM does not have a tokenizer. Using simple token count.
π₯ Starting optimization...
π Starting evaluation...
diff --git a/promptolution/exemplar_selectors/random_search_selector.py b/promptolution/exemplar_selectors/random_search_selector.py
index b8cb6ee..1eef61b 100644
--- a/promptolution/exemplar_selectors/random_search_selector.py
+++ b/promptolution/exemplar_selectors/random_search_selector.py
@@ -28,12 +28,12 @@ def select_exemplars(self, prompt: Prompt, n_trials: int = 5) -> Prompt:
best_prompt = prompt
for _ in range(n_trials):
- _, seq = self.task.evaluate(
- prompt, self.predictor, eval_strategy="subsample", return_seq=True, return_agg_scores=False
- )
+ result = self.task.evaluate(prompt, self.predictor, eval_strategy="subsample")
+ seq = result.sequences
prompt_with_examples = Prompt(prompt.instruction, [seq[0][0]])
# evaluate prompts as few shot prompt
- score = self.task.evaluate(prompt_with_examples, self.predictor, eval_strategy="subsample")[0]
+ result = self.task.evaluate(prompt_with_examples, self.predictor, eval_strategy="subsample")
+ score = float(result.agg_scores[0])
if score > best_score:
best_score = score
best_prompt = prompt_with_examples
diff --git a/promptolution/exemplar_selectors/random_selector.py b/promptolution/exemplar_selectors/random_selector.py
index 7b0ae0f..63b0295 100644
--- a/promptolution/exemplar_selectors/random_selector.py
+++ b/promptolution/exemplar_selectors/random_selector.py
@@ -53,10 +53,10 @@ def select_exemplars(self, prompt: Prompt, n_examples: int = 5) -> Prompt:
"""
examples: List[str] = []
while len(examples) < n_examples:
- scores, seqs = self.task.evaluate(
- prompt, self.predictor, eval_strategy="subsample", return_seq=True, return_agg_scores=False
- )
- score = np.mean(scores)
+ result = self.task.evaluate(prompt, self.predictor, eval_strategy="subsample")
+ scores = result.scores
+ seqs = result.sequences
+ score = float(np.mean(scores))
seq = seqs[0][0]
if score == self.desired_score:
examples.append(seq)
diff --git a/promptolution/helpers.py b/promptolution/helpers.py
index a25c008..9cf13d6 100644
--- a/promptolution/helpers.py
+++ b/promptolution/helpers.py
@@ -13,10 +13,11 @@
from promptolution.optimizers.base_optimizer import BaseOptimizer
from promptolution.predictors.base_predictor import BasePredictor
from promptolution.tasks.base_task import BaseTask
- from promptolution.utils.config import ExperimentConfig
from promptolution.tasks.base_task import TaskType
from promptolution.optimizers.base_optimizer import OptimizerType
from promptolution.predictors.base_predictor import PredictorType
+ from promptolution.utils import ExperimentConfig
+
import pandas as pd
@@ -79,10 +80,6 @@ def run_optimization(df: pd.DataFrame, config: "ExperimentConfig") -> List[Promp
)
config.prompts = [Prompt(p) for p in initial_prompts]
- if config.optimizer == "capo" and (config.eval_strategy is None or "block" not in config.eval_strategy):
- logger.warning("π CAPO requires block evaluation strategy. Setting it to 'sequential_block'.")
- config.eval_strategy = "sequential_block"
-
task = get_task(df, config, judge_llm=llm)
optimizer = get_optimizer(
predictor=predictor,
@@ -121,10 +118,12 @@ def run_evaluation(
logger.warning("π Starting evaluation...")
if isinstance(prompts[0], str):
str_prompts = cast(List[str], prompts)
- prompts = [Prompt(p) for p in str_prompts]
+ prompt_objs = [Prompt(p) for p in str_prompts]
else:
str_prompts = [p.construct_prompt() for p in cast(List[Prompt], prompts)]
- scores = task.evaluate(prompts, predictor, eval_strategy="full")
+ prompt_objs = cast(List[Prompt], prompts)
+ results = task.evaluate(prompt_objs, predictor, eval_strategy="full")
+ scores = results.agg_scores.tolist()
df = pd.DataFrame(dict(prompt=str_prompts, score=scores))
df = df.sort_values("score", ascending=False, ignore_index=True)
@@ -132,7 +131,7 @@ def run_evaluation(
def get_llm(model_id: Optional[str] = None, config: Optional["ExperimentConfig"] = None) -> "BaseLLM":
- """Factory function to create and return a language model instance based on the provided model_id.
+ """Create and return a language model instance based on the provided model_id.
This function supports three types of language models:
1. LocalLLM: For running models locally.
@@ -204,10 +203,9 @@ def get_optimizer(
meta_llm: "BaseLLM",
task: "BaseTask",
optimizer: Optional["OptimizerType"] = None,
- task_description: Optional[str] = None,
config: Optional["ExperimentConfig"] = None,
) -> "BaseOptimizer":
- """Creates and returns an optimizer instance based on provided parameters.
+ """Create and return an optimizer instance based on provided parameters.
Args:
predictor: The predictor used for prompt evaluation
@@ -215,7 +213,6 @@ def get_optimizer(
task: The task object used for evaluating prompts
optimizer: String identifying which optimizer to use
meta_prompt: Meta prompt text for the optimizer
- task_description: Description of the task for the optimizer
config: Configuration object with default parameters
Returns:
@@ -225,10 +222,6 @@ def get_optimizer(
ValueError: If an unknown optimizer type is specified
"""
final_optimizer = optimizer or (config.optimizer if config else None)
- if config is None:
- config = ExperimentConfig()
- if task_description is not None:
- config.task_description = task_description
if final_optimizer == "capo":
return CAPO(
@@ -253,7 +246,7 @@ def get_optimizer(
def get_exemplar_selector(
name: Literal["random", "random_search"], task: "BaseTask", predictor: "BasePredictor"
) -> "BaseExemplarSelector":
- """Factory function to get an exemplar selector based on the given name.
+ """Get an exemplar selector based on the given name.
Args:
name (str): The name of the exemplar selector to instantiate.
@@ -274,8 +267,10 @@ def get_exemplar_selector(
raise ValueError(f"Unknown exemplar selector: {name}")
-def get_predictor(downstream_llm=None, type: "PredictorType" = "marker", *args, **kwargs) -> "BasePredictor":
- """Factory function to create and return a predictor instance.
+def get_predictor(
+ downstream_llm: Optional["BaseLLM"] = None, type: "PredictorType" = "marker", *args, **kwargs
+) -> "BasePredictor":
+ """Create and return a predictor instance.
This function supports three types of predictors:
1. FirstOccurrencePredictor: A predictor that classifies based on first occurrence of the label.
@@ -292,6 +287,7 @@ def get_predictor(downstream_llm=None, type: "PredictorType" = "marker", *args,
Returns:
An instance of FirstOccurrencePredictor or MarkerBasedPredictor.
"""
+ assert downstream_llm is not None, "downstream_llm must be provided to create a predictor."
if type == "first_occurrence":
return FirstOccurrencePredictor(downstream_llm, *args, **kwargs)
elif type == "marker":
diff --git a/promptolution/llms/api_llm.py b/promptolution/llms/api_llm.py
index c6971a6..1e34e53 100644
--- a/promptolution/llms/api_llm.py
+++ b/promptolution/llms/api_llm.py
@@ -210,7 +210,7 @@ def _submit(self, coro):
return asyncio.run_coroutine_threadsafe(coro, self._loop)
def _get_response(self, prompts: List[str], system_prompts: List[str]) -> List[str]:
- """Synchronously obtain responses for a batch of prompts.
+ """Obtain responses synchronously for a batch of prompts.
This is the main entrypoint used by external callers. It handles system
prompt broadcasting and delegates the actual work to the async batch
diff --git a/promptolution/llms/vllm.py b/promptolution/llms/vllm.py
index f22ff52..cd91a25 100644
--- a/promptolution/llms/vllm.py
+++ b/promptolution/llms/vllm.py
@@ -13,8 +13,9 @@
logger = get_logger(__name__)
try:
- from transformers import AutoTokenizer # type: ignore
- from vllm import LLM, SamplingParams
+ from transformers import AutoTokenizer # noqa: F401 (import required for testing)
+ from vllm import LLM
+ from vllm.sampling_params import SamplingParams
imports_successful = True
except ImportError:
@@ -113,23 +114,23 @@ def __init__(
self.llm = LLM(**llm_params)
- # Initialize tokenizer separately for potential pre-processing
- self.tokenizer = AutoTokenizer.from_pretrained(model_id)
+ self.tokenizer = self.llm.get_tokenizer()
if batch_size is None:
- cache_config = self.llm.llm_engine.model_executor.cache_config
- if (
- cache_config.num_gpu_blocks is not None
- and cache_config.block_size is not None
- and self.max_model_len is not None
- ):
- self.batch_size = int(
- (cache_config.num_gpu_blocks * cache_config.block_size / self.max_model_len) * 0.95
- )
- logger.info(f"π Batch size set to {self.batch_size} based on GPU memory.")
+ max_num_seqs = int(llm_kwargs.get("max_num_seqs", 1))
+ max_num_batched_tokens = llm_kwargs.get("max_num_batched_tokens", None)
+
+ # Heuristic: if vLLM is capped by batched tokens, don't feed more seqs than fit.
+ if max_num_batched_tokens is not None and self.max_model_len is not None:
+ token_limited = max(1, int(max_num_batched_tokens) // int(self.max_model_len))
+ self.batch_size = max(1, min(max_num_seqs, token_limited))
else:
- self.batch_size = 1
- logger.warning("β οΈ Could not determine batch size from GPU memory. Using batch size of 1.")
+ self.batch_size = max(1, max_num_seqs)
+
+ logger.info(
+ f"π Batch size set to {self.batch_size} (max_num_seqs={max_num_seqs}, "
+ f"max_num_batched_tokens={max_num_batched_tokens}, max_model_len={self.max_model_len})."
+ )
else:
self.batch_size = batch_size
diff --git a/promptolution/optimizers/base_optimizer.py b/promptolution/optimizers/base_optimizer.py
index 7264f6f..69d163f 100644
--- a/promptolution/optimizers/base_optimizer.py
+++ b/promptolution/optimizers/base_optimizer.py
@@ -31,6 +31,8 @@ class BaseOptimizer(ABC):
predictor: The predictor used for prompt evaluation (if applicable).
"""
+ supports_multi_objective: bool = False
+
def __init__(
self,
predictor: "BasePredictor",
@@ -50,6 +52,12 @@ def __init__(
"""
# Set up optimizer state
self.prompts: List[Prompt] = [Prompt(p) for p in initial_prompts] if initial_prompts else []
+ if task.task_type == "multi" and not self.supports_multi_objective:
+ logger.warning(
+ f"{self.__class__.__name__} does not support multi-objective tasks, objectives will be averaged equally.",
+ )
+ task.activate_scalarized_objective()
+
self.task = task
self.callbacks: List["BaseCallback"] = callbacks or []
self.predictor = predictor
diff --git a/promptolution/optimizers/capo.py b/promptolution/optimizers/capo.py
index 3c5955a..43ff563 100644
--- a/promptolution/optimizers/capo.py
+++ b/promptolution/optimizers/capo.py
@@ -16,10 +16,10 @@
from promptolution.utils.test_statistics import TestStatistics
from promptolution.optimizers.base_optimizer import BaseOptimizer
-from promptolution.utils.formatting import extract_from_tag
+from promptolution.utils.capo_utils import build_few_shot_examples, perform_crossover, perform_mutation
from promptolution.utils.logging import get_logger
from promptolution.utils.prompt import Prompt, sort_prompts_by_scores
-from promptolution.utils.templates import CAPO_CROSSOVER_TEMPLATE, CAPO_FEWSHOT_TEMPLATE, CAPO_MUTATION_TEMPLATE
+from promptolution.utils.templates import CAPO_CROSSOVER_TEMPLATE, CAPO_MUTATION_TEMPLATE
from promptolution.utils.test_statistics import get_test_statistic_func
from promptolution.utils.token_counter import get_token_counter
@@ -29,7 +29,7 @@
class CAPO(BaseOptimizer):
"""CAPO: Cost-Aware Prompt Optimization.
- This class implements an evolutionary algorithm for optimizing prompts in large language models
+ This class implements an evolutionary algorithm for optimizing prompts in LLMs
by incorporating racing techniques and multi-objective optimization. It uses crossover, mutation,
and racing based on evaluation scores and statistical tests to improve efficiency while balancing
performance with prompt length. It is adapted from the paper "CAPO: Cost-Aware Prompt Optimization" by Zehle et al., 2025.
@@ -55,7 +55,7 @@ def __init__(
callbacks: Optional[List["BaseCallback"]] = None,
config: Optional["ExperimentConfig"] = None,
) -> None:
- """Initializes the CAPOptimizer with various parameters for prompt evolution.
+ """Initialize the CAPOptimizer with various parameters for prompt evolution.
Args:
predictor (BasePredictor): The predictor for evaluating prompt performance.
@@ -114,135 +114,26 @@ def __init__(
self.population_size = len(self.prompts)
if hasattr(self.predictor, "begin_marker") and hasattr(self.predictor, "end_marker"):
- self.target_begin_marker = self.predictor.begin_marker
- self.target_end_marker = self.predictor.end_marker
+ self.target_begin_marker = self.predictor.begin_marker # type: ignore
+ self.target_end_marker = self.predictor.end_marker # type: ignore
else:
self.target_begin_marker = ""
self.target_end_marker = ""
def _initialize_population(self, initial_prompts: List[Prompt]) -> List[Prompt]:
- """Initializes the population of Prompt objects from initial instructions.
-
- Args:
- initial_prompts (List[str]): List of initial prompt instructions.
-
- Returns:
- List[Prompt]: Initialized population of prompts with few-shot examples.
- """
+ """Initialize the population of Prompt objects from initial instructions."""
population = []
for prompt in initial_prompts:
num_examples = random.randint(0, self.upper_shots)
- few_shots = self._create_few_shot_examples(prompt.instruction, num_examples)
+ few_shots = build_few_shot_examples(
+ instruction=prompt.instruction,
+ num_examples=num_examples,
+ optimizer=self,
+ )
population.append(Prompt(prompt.instruction, few_shots))
return population
- def _create_few_shot_examples(self, instruction: str, num_examples: int) -> List[str]:
- if num_examples == 0:
- return []
-
- few_shot_samples = self.df_few_shots.sample(num_examples, replace=False)
- sample_inputs = few_shot_samples[self.task.x_column].values.astype(str)
- sample_targets = few_shot_samples[self.task.y_column].values
- few_shots = [
- CAPO_FEWSHOT_TEMPLATE.replace(" ", i).replace(
- "", f"{self.target_begin_marker}{t}{self.target_end_marker}"
- )
- for i, t in zip(sample_inputs, sample_targets)
- ]
-
- if not self.create_fs_reasoning:
- # If we do not create reasoning, return the few-shot examples directly
- return few_shots
-
- preds, seqs = self.predictor.predict(
- [instruction] * num_examples,
- list(sample_inputs),
- return_seq=True,
- )
- if isinstance(seqs, str):
- seqs = [seqs]
- if isinstance(preds, str):
- preds = [preds]
-
- # Check which predictions are correct and get a single one per example
- for j in range(num_examples):
- # Process and clean up the generated sequences
- seqs[j] = seqs[j].replace(sample_inputs[j], "", 1).strip()
- # Check if the prediction is correct and add reasoning if so
- if preds[j] == sample_targets[j] or not self.check_fs_accuracy:
- few_shots[j] = CAPO_FEWSHOT_TEMPLATE.replace(" ", sample_inputs[j]).replace("", seqs[j])
-
- return few_shots
-
- def _crossover(self, parents: List[Prompt]) -> List[Prompt]:
- """Performs crossover among parent prompts to generate offsprings.
-
- Args:
- parents (List[Prompt]): List of parent prompts.
-
- Returns:
- List[Prompt]: List of new offsprings after crossover.
- """
- crossover_prompts = []
- offspring_few_shots = []
- for _ in range(self.crossovers_per_iter):
- mother, father = random.sample(parents, 2)
- crossover_prompt = (
- self.crossover_template.replace("", mother.instruction)
- .replace("", father.instruction)
- .strip()
- )
- # collect all crossover prompts then pass them bundled to the meta llm (speedup)
- crossover_prompts.append(crossover_prompt)
- combined_few_shots = mother.few_shots + father.few_shots
- num_few_shots = (len(mother.few_shots) + len(father.few_shots)) // 2
- offspring_few_shot = random.sample(combined_few_shots, num_few_shots) if combined_few_shots else []
- offspring_few_shots.append(offspring_few_shot)
-
- child_instructions = self.meta_llm.get_response(crossover_prompts)
-
- offsprings = []
- for instruction, examples in zip(child_instructions, offspring_few_shots):
- instruction = extract_from_tag(instruction, "", " ")
- offsprings.append(Prompt(instruction, examples))
-
- return offsprings
-
- def _mutate(self, offsprings: List[Prompt]) -> List[Prompt]:
- """Apply mutation to offsprings to generate new candidate prompts.
-
- Args:
- offsprings (List[Prompt]): List of offsprings to mutate.
-
- Returns:
- List[Prompt]: List of mutated prompts.
- """
- # collect all mutation prompts then pass them bundled to the meta llm (speedup)
- mutation_prompts = [
- self.mutation_template.replace("", prompt.instruction) for prompt in offsprings
- ]
- new_instructions = self.meta_llm.get_response(mutation_prompts)
-
- mutated = []
- for new_instruction, prompt in zip(new_instructions, offsprings):
- new_instruction = extract_from_tag(new_instruction, "", " ")
- p = random.random()
-
- new_few_shots: List[str]
- if p < 1 / 3 and len(prompt.few_shots) < self.upper_shots: # add a random few shot
- new_few_shot = self._create_few_shot_examples(new_instruction, 1)
- new_few_shots = prompt.few_shots + new_few_shot
- elif 1 / 3 <= p < 2 / 3 and len(prompt.few_shots) > 0: # remove a random few shot
- new_few_shots = random.sample(prompt.few_shots, len(prompt.few_shots) - 1)
- else: # do not change few shots, but shuffle
- new_few_shots = prompt.few_shots
-
- random.shuffle(new_few_shots)
- mutated.append(Prompt(new_instruction, new_few_shots))
-
- return mutated
-
def _do_racing(self, candidates: List[Prompt], k: int) -> Tuple[List[Prompt], List[float]]:
"""Perform the racing (selection) phase by comparing candidates based on their evaluation scores using the provided test statistic.
@@ -254,21 +145,20 @@ def _do_racing(self, candidates: List[Prompt], k: int) -> Tuple[List[Prompt], Li
List[Prompt]: List of surviving prompts after racing.
"""
self.task.reset_block_idx()
- block_scores: List[List[float]] = []
+ block_scores: List[np.ndarray] = []
i = 0
while len(candidates) > k and i < self.max_n_blocks_eval:
# new_scores shape: (n_candidates, n_samples)
- new_scores: List[float] = self.task.evaluate(candidates, self.predictor, return_agg_scores=False)
+ results = self.task.evaluate(candidates, self.predictor)
+ new_scores = results.scores
# subtract length penalty
prompt_lengths = np.array([self.token_counter(c.construct_prompt()) for c in candidates])
rel_prompt_lengths = prompt_lengths / self.max_prompt_length
- penalized_new_scores = np.array(new_scores) - self.length_penalty * rel_prompt_lengths[:, None]
+ penalized_new_scores = new_scores - self.length_penalty * rel_prompt_lengths[:, None]
- new_scores = penalized_new_scores.tolist()
-
- block_scores.append(new_scores)
+ block_scores.append(penalized_new_scores)
scores = np.concatenate(block_scores, axis=1)
# boolean matrix C_ij indicating if candidate j is better than candidate i
@@ -279,12 +169,13 @@ def _do_racing(self, candidates: List[Prompt], k: int) -> Tuple[List[Prompt], Li
# Sum along rows to get number of better scores for each candidate
n_better = np.sum(comparison_matrix, axis=1)
- candidates, block_scores = filter_survivors(candidates, block_scores, mask=n_better < k)
+ candidates, block_scores = self.filter_survivors(candidates, block_scores, mask=n_better < k)
i += 1
self.task.increment_block_idx()
- avg_scores = self.task.evaluate(candidates, self.predictor, eval_strategy="evaluated")
+ final_result = self.task.evaluate(candidates, self.predictor, eval_strategy="evaluated")
+ avg_scores = final_result.agg_scores.tolist()
prompts, avg_scores = sort_prompts_by_scores(candidates, avg_scores, top_k=k)
return prompts, avg_scores
@@ -297,39 +188,35 @@ def _pre_optimization_loop(self) -> None:
self.task.reset_block_idx()
def _step(self) -> List[Prompt]:
- """Perform a single optimization step.
-
- Returns:
- List[Prompt]: The optimized list of prompts after the step.
- """
- offsprings = self._crossover(self.prompts)
- mutated = self._mutate(offsprings)
+ """Perform a single optimization step."""
+ offsprings = perform_crossover(self.prompts, optimizer=self)
+ mutated = perform_mutation(offsprings=offsprings, optimizer=self)
combined = self.prompts + mutated
self.prompts, self.scores = self._do_racing(combined, self.population_size)
return self.prompts
+ @staticmethod
+ def filter_survivors(
+ candidates: List[Prompt], scores: List[np.ndarray], mask: Any
+ ) -> Tuple[List[Prompt], List[np.ndarray]]:
+ """Filter candidates and scores based on a boolean mask.
-def filter_survivors(
- candidates: List[Prompt], scores: List[List[float]], mask: Any
-) -> Tuple[List[Prompt], List[List[float]]]:
- """Filter candidates and scores based on a boolean mask.
-
- Args:
- candidates (List[Prompt]): List of candidate prompts.
- scores (List[List[float]]): Corresponding scores for the candidates.
- mask (Any): Boolean mask indicating which candidates to keep.
+ Args:
+ candidates (List[Prompt]): List of candidate prompts.
+ scores (List[List[float]]): Corresponding scores for the candidates.
+ mask (Any): Boolean mask indicating which candidates to keep.
- Returns:
- Tuple[List[Prompt], List[List[float]]]: Filtered candidates and their scores.
- """
- assert len(candidates) == len(mask), "Length of candidates, and mask must be the same."
- assert all(
- len(candidates) == len(score) for score in scores
- ), "Each score list must have the same length as candidates."
+ Returns:
+ Tuple[List[Prompt], List[List[float]]]: Filtered candidates and their scores.
+ """
+ assert len(candidates) == len(mask), "Length of candidates, and mask must be the same."
+ assert all(
+ len(candidates) == len(score) for score in scores
+ ), "Each score list must have the same length as candidates."
- filtered_candidates = [c for c, m in zip(candidates, mask) if m]
- filtered_scores = [[s for s, m in zip(score, mask) if m] for score in scores]
+ filtered_candidates = [c for c, m in zip(candidates, mask) if m]
+ filtered_scores = [np.asarray(score)[mask] for score in scores]
- return filtered_candidates, filtered_scores
+ return filtered_candidates, filtered_scores
diff --git a/promptolution/optimizers/evoprompt_de.py b/promptolution/optimizers/evoprompt_de.py
index f6e701a..0ae339d 100644
--- a/promptolution/optimizers/evoprompt_de.py
+++ b/promptolution/optimizers/evoprompt_de.py
@@ -60,7 +60,8 @@ def __init__(
self.prompt_template = self._initialize_meta_template(prompt_template or EVOPROMPT_DE_TEMPLATE_TD)
def _pre_optimization_loop(self) -> None:
- self.scores = self.task.evaluate(self.prompts, self.predictor, return_agg_scores=True)
+ result = self.task.evaluate(self.prompts, self.predictor)
+ self.scores = result.agg_scores.tolist()
self.prompts, self.scores = sort_prompts_by_scores(self.prompts, self.scores)
def _step(self) -> List[Prompt]:
@@ -99,7 +100,8 @@ def _step(self) -> List[Prompt]:
child_instructions = extract_from_tag(child_instructions, "", " ")
child_prompts = [Prompt(p) for p in child_instructions]
- child_scores = self.task.evaluate(child_prompts, self.predictor, return_agg_scores=True)
+ child_result = self.task.evaluate(child_prompts, self.predictor)
+ child_scores = child_result.agg_scores.tolist()
for i in range(len(self.prompts)):
if child_scores[i] > self.scores[i]:
diff --git a/promptolution/optimizers/evoprompt_ga.py b/promptolution/optimizers/evoprompt_ga.py
index 9a0b4e3..ae8dfb4 100644
--- a/promptolution/optimizers/evoprompt_ga.py
+++ b/promptolution/optimizers/evoprompt_ga.py
@@ -68,17 +68,19 @@ def __init__(
assert self.selection_mode in ["random", "wheel", "tour"], "Invalid selection mode."
def _pre_optimization_loop(self) -> None:
- self.scores = self.task.evaluate(self.prompts, self.predictor, return_agg_scores=True)
+ result = self.task.evaluate(self.prompts, self.predictor)
+ self.scores = result.agg_scores
self.prompts, self.scores = sort_prompts_by_scores(self.prompts, self.scores)
def _step(self) -> List[Prompt]:
new_prompts = self._crossover(self.prompts, self.scores)
- new_scores = self.task.evaluate(new_prompts, self.predictor, return_agg_scores=True)
+ new_result = self.task.evaluate(new_prompts, self.predictor)
+ new_scores = new_result.agg_scores
prompts = self.prompts + new_prompts
- scores = self.scores + new_scores
+ combined_scores = np.concatenate([np.asarray(self.scores), np.asarray(new_scores)], axis=0)
- self.prompts, self.scores = sort_prompts_by_scores(prompts, scores, top_k=len(self.prompts))
+ self.prompts, self.scores = sort_prompts_by_scores(prompts, combined_scores, top_k=len(self.prompts))
return self.prompts
diff --git a/promptolution/optimizers/opro.py b/promptolution/optimizers/opro.py
index e7b9048..2e613ac 100644
--- a/promptolution/optimizers/opro.py
+++ b/promptolution/optimizers/opro.py
@@ -105,7 +105,8 @@ def _add_prompt_and_score(self, prompt: Prompt, score: float) -> None:
self.prompts, self.scores = sort_prompts_by_scores(self.prompts, self.scores, top_k=self.max_num_instructions)
def _pre_optimization_loop(self):
- self.scores = self.task.evaluate(self.prompts, self.predictor)
+ result = self.task.evaluate(self.prompts, self.predictor)
+ self.scores = result.agg_scores.tolist()
self.meta_prompt = self.meta_prompt_template.replace("", self._format_instructions()).replace(
"", self._sample_examples()
)
@@ -125,7 +126,8 @@ def _step(self) -> List[Prompt]:
duplicate_prompts += 1
continue
- score = self.task.evaluate(prompt, self.predictor)[0]
+ prompt_result = self.task.evaluate([prompt], self.predictor)
+ score = prompt_result.agg_scores.tolist()[0]
self._add_prompt_and_score(prompt, score)
diff --git a/promptolution/predictors/base_predictor.py b/promptolution/predictors/base_predictor.py
index 292d56d..f0060a1 100644
--- a/promptolution/predictors/base_predictor.py
+++ b/promptolution/predictors/base_predictor.py
@@ -37,8 +37,7 @@ def predict(
prompts: Union[str, List[str]],
xs: List[str],
system_prompts: Optional[Union[str, List[str]]] = None,
- return_seq: bool = False,
- ) -> Union[List[str], Tuple[List[str], List[str]]]:
+ ) -> Tuple[List[str], List[str]]:
"""Abstract method to make predictions based on prompts and input data.
Args:
@@ -57,11 +56,8 @@ def predict(
outputs = self.llm.get_response(inputs, system_prompts=system_prompts)
preds = self._extract_preds(outputs)
- if return_seq:
- seqs = [f"{x}\n{out}" for x, out in zip(xs, outputs)]
- return preds, seqs
-
- return preds
+ seqs = [f"{x}\n{out}" for x, out in zip(xs, outputs)]
+ return preds, seqs
@abstractmethod
def _extract_preds(self, preds: List[str]) -> List[str]:
diff --git a/promptolution/tasks/__init__.py b/promptolution/tasks/__init__.py
index 825dbad..7dadf4f 100644
--- a/promptolution/tasks/__init__.py
+++ b/promptolution/tasks/__init__.py
@@ -2,10 +2,12 @@
from promptolution.tasks.classification_tasks import ClassificationTask
from promptolution.tasks.judge_tasks import JudgeTask
+from promptolution.tasks.multi_objective_task import MultiObjectiveTask
from promptolution.tasks.reward_tasks import RewardTask
__all__ = [
"ClassificationTask",
"JudgeTask",
"RewardTask",
+ "MultiObjectiveTask",
]
diff --git a/promptolution/tasks/base_task.py b/promptolution/tasks/base_task.py
index 2f1c164..a733be4 100644
--- a/promptolution/tasks/base_task.py
+++ b/promptolution/tasks/base_task.py
@@ -2,21 +2,39 @@
from abc import ABC, abstractmethod
+from dataclasses import dataclass
import numpy as np
import pandas as pd
-from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Tuple, Union, overload
+from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Tuple, Union
+from promptolution.utils.logging import get_logger
from promptolution.utils.prompt import Prompt
+from promptolution.utils.token_counter import get_token_counter
if TYPE_CHECKING: # pragma: no cover
from promptolution.predictors.base_predictor import BasePredictor
from promptolution.utils.config import ExperimentConfig
-TaskType = Literal["classification", "reward", "judge"]
-EvalStrategy = Literal["full", "subsample", "sequential_block", "random_block"]
+TaskType = Literal["classification", "reward", "judge", "multi"]
+EvalStrategy = Literal["full", "subsample", "sequential_block", "random_block", "evaluated"]
+
+logger = get_logger(__name__)
+
+
+@dataclass
+class EvalResult:
+ """Evaluation outputs including scores, sequences, and costs."""
+
+ scores: np.ndarray # shape: (n_prompts, n_datapoints)
+ agg_scores: np.ndarray # shape: (n_prompts,) - mean over datapoints
+ sequences: np.ndarray # shape: (n_prompts, n_datapoints)
+ input_tokens: np.ndarray # shape: (n_prompts, n_datapoints)
+ output_tokens: np.ndarray # shape: (n_prompts, n_datapoints)
+ agg_input_tokens: np.ndarray # shape: (n_prompts,) - mean over datapoints
+ agg_output_tokens: np.ndarray # shape: (n_prompts,) - mean over datapoints
class BaseTask(ABC):
@@ -45,42 +63,61 @@ def __init__(
seed (int): Random seed for reproducibility.
config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
"""
- self.df = df
- self.x_column = x_column
- self.y_column = y_column
- self.task_description = task_description
- self.n_subsamples = n_subsamples
- self.eval_strategy = eval_strategy
- self.seed = seed
+ self.df = df.drop_duplicates(subset=[x_column])
+ if len(self.df) != len(df):
+ logger.warning(
+ f"Duplicate entries detected for x_column '{x_column}' - dropped {len(df) - len(self.df)} rows to enforce uniqueness."
+ )
+ self.x_column: str = x_column
+ self.y_column: Optional[str] = y_column
+ self.task_type: TaskType | None = None
+ self.task_description: Optional[str] = task_description
+ self.n_subsamples: int = n_subsamples
+ self.eval_strategy: EvalStrategy = eval_strategy
+ self.seed: int = seed
super().__init__()
if config is not None:
config.apply_to(self)
- self.xs: List[str] = df[self.x_column].values.astype(str).tolist()
- self.has_y = y_column is not None
+ self.xs: List[str] = self.df[self.x_column].values.astype(str).tolist()
+ self.has_y: bool = y_column is not None
if self.has_y and y_column is not None:
- self.ys: List[str] = df[y_column].values.astype(str).tolist()
+ self.ys: List[str] = self.df[y_column].values.astype(str).tolist()
else:
# If no y_column is provided, create a dummy y array
self.ys = [""] * len(self.xs)
- self.block_idx = 0
- self.n_blocks = len(self.xs) // self.n_subsamples if self.n_subsamples > 0 else 1
+ self.block_idx: int = 0
+ self.n_blocks: int = len(self.xs) // self.n_subsamples if self.n_subsamples > 0 else 1
self.rng = np.random.default_rng(seed)
self.eval_cache: Dict[Tuple[str, str, str], float] = {} # (prompt, x, y): scores per datapoint
- self.seq_cache: Dict[Tuple[str, str, str], str] = {} # (prompt, x, y): generating sequence per datapoint
+ self.seq_cache: Dict[Tuple[str, str, str], str] = {} # (prompt, x, y): raw model output per datapoint
+
+ self.prompt_evaluated_blocks: Dict[Prompt, List[int]] = {} # prompt_str: set of evaluated block indices
- def subsample(self, eval_strategy: "EvalStrategy" = None) -> Tuple[List[str], List[str]]:
+ def subsample(
+ self, eval_strategy: Optional["EvalStrategy"] = None, block_idx: List[int] | None = None
+ ) -> Tuple[List[str], List[str]]:
"""Subsample the dataset based on the specified parameters.
Args:
eval_strategy (EvalStrategy, optional): Subsampling strategy to use instead of self.eval_strategy. Defaults to None.
+ block_idx (List[int] | None, optional): Specific block index or indices to evaluate, overriding eval_strategy. Defaults to None.
Returns:
Tuple[List[str], List[str]]: Subsampled input data and labels.
"""
+ if block_idx is not None:
+ indices: List[int] = []
+ for idx in block_idx:
+ start_idx = idx * self.n_subsamples
+ end_idx = min((idx + 1) * self.n_subsamples, len(self.xs))
+ indices.extend(range(start_idx, end_idx))
+
+ return [self.xs[i] for i in indices], [self.ys[i] for i in indices]
+
if eval_strategy is None:
eval_strategy = self.eval_strategy
@@ -96,10 +133,19 @@ def subsample(self, eval_strategy: "EvalStrategy" = None) -> Tuple[List[str], Li
indices = np.arange(start_idx, end_idx)
return [self.xs[i] for i in indices], [self.ys[i] for i in indices]
elif eval_strategy == "sequential_block":
- start_idx = self.block_idx * self.n_subsamples
- end_idx = min((self.block_idx + 1) * self.n_subsamples, len(self.xs))
- indices = np.arange(start_idx, end_idx)
- return [self.xs[i] for i in indices], [self.ys[i] for i in indices]
+ # Handle case where self.block_idx is a list
+ if isinstance(self.block_idx, list):
+ indices_list: List[int] = []
+ for idx in self.block_idx:
+ start_idx = idx * self.n_subsamples
+ end_idx = min((idx + 1) * self.n_subsamples, len(self.xs))
+ indices_list.extend(range(start_idx, end_idx))
+ return [self.xs[i] for i in indices_list], [self.ys[i] for i in indices_list]
+ else:
+ start_idx = self.block_idx * self.n_subsamples
+ end_idx = min((self.block_idx + 1) * self.n_subsamples, len(self.xs))
+ indices = np.arange(start_idx, end_idx)
+ return [self.xs[i] for i in indices], [self.ys[i] for i in indices]
else:
raise ValueError(f"Unknown subsampling strategy: '{eval_strategy}'")
@@ -109,188 +155,188 @@ def _prepare_batch(
xs: List[str],
ys: List[str],
eval_strategy: Literal["full", "subsample", "sequential_block", "random_block", "evaluated"] = "full",
- ) -> List[Tuple[str, str, str]]:
- """Generates (prompt, x, y) keys that require prediction.
-
- Returns keys not found in eval_cache.
- """
+ ) -> Tuple[List[str], List[str], List[str], List[Tuple[str, str, str]]]:
+ """Return uncached prompt/x/y triples for prediction and their cache keys."""
if eval_strategy == "evaluated":
- return []
- keys_to_predict = []
+ return [], [], [], []
+
+ prompts_to_predict: List[str] = []
+ xs_to_predict: List[str] = []
+ ys_to_predict: List[str] = []
+ keys_to_predict: List[Tuple[str, str, str]] = []
+
for prompt in prompts:
for x, y in zip(xs, ys):
- cache_key = (prompt.construct_prompt(), x, str(y))
- if cache_key not in self.eval_cache:
- keys_to_predict.append(cache_key)
- return keys_to_predict
+ cache_key = (str(prompt), x, str(y))
+ if cache_key in self.eval_cache:
+ continue
+ prompts_to_predict.append(str(prompt))
+ xs_to_predict.append(x)
+ ys_to_predict.append(str(y))
+ keys_to_predict.append(cache_key)
+
+ return prompts_to_predict, xs_to_predict, ys_to_predict, keys_to_predict
+
+ @staticmethod
+ def _cache_key(prompt: Prompt, x: str, y: str) -> Tuple[str, str, str]:
+ return (prompt.construct_prompt(), x, y)
def _collect_results_from_cache(
+ self, prompts: List[Prompt], xs: List[str], ys: List[str]
+ ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
+ """Collect cached scores and sequences for provided prompts/xs/ys."""
+ score_rows: List[List[float]] = []
+ seq_rows: List[List[str]] = []
+
+ for prompt in prompts:
+ datapoint_scores: List[float] = []
+ datapoint_seqs: List[str] = []
+ for x, y in zip(xs, ys):
+ cache_key = self._cache_key(prompt, x, str(y))
+ if cache_key not in self.eval_cache:
+ datapoint_scores.append(np.nan) # Fill with NaN instead of skipping
+ datapoint_seqs.append("")
+ else:
+ datapoint_score = self.eval_cache[cache_key]
+ datapoint_scores.append(datapoint_score)
+ datapoint_seqs.append(self.seq_cache.get(cache_key, ""))
+ score_rows.append(datapoint_scores)
+ seq_rows.append(datapoint_seqs)
+
+ scores_array = np.array(score_rows, dtype=float)
+ agg_scores = np.nanmean(scores_array, axis=1) if scores_array.size else np.array([])
+ seqs_array = np.array(seq_rows, dtype=object)
+ return scores_array, agg_scores, seqs_array
+
+ def _compute_costs(
self,
prompts: List[Prompt],
xs: List[str],
ys: List[str],
- return_agg_scores: bool,
- return_seq: bool,
- ) -> Union[List[float], List[List[float]], Tuple[List[List[float]], List[List[str]]]]:
- """Collects all results for the current batch from the cache and formats them."""
- assert not (return_agg_scores and return_seq), "Cannot return both aggregated scores and sequences"
+ predictor: "BasePredictor",
+ ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
+ token_counter = get_token_counter(predictor.llm)
- scores = []
- seqs = []
+ per_prompt_inputs: List[np.ndarray] = []
+ per_prompt_outputs: List[np.ndarray] = []
for prompt in prompts:
- datapoint_scores = []
- datapoint_seqs = []
+ prompt_token_count = token_counter(prompt.construct_prompt())
+ seq_token_counts: List[float] = []
+ input_token_counts = []
for x, y in zip(xs, ys):
- cache_key = (prompt.construct_prompt(), x, y)
- datapoint_score = self.eval_cache.get(cache_key)
- if datapoint_score is None:
+ cache_key = self._cache_key(prompt, x, str(y))
+ if cache_key not in self.seq_cache:
+ # Use NaN for missing datapoints instead of skipping
+ seq_token_counts.append(np.nan)
+ input_token_counts.append(np.nan)
continue
- datapoint_scores.append(datapoint_score)
- if return_seq:
- datapoint_seqs.append(self.seq_cache.get(cache_key, ""))
- scores.append(datapoint_scores)
- if return_seq:
- seqs.append(datapoint_seqs)
+ seq_text = self.seq_cache[cache_key]
+ seq_token_counts.append(token_counter(seq_text))
+ input_token_counts.append(token_counter(x))
+
+ prompt_input_tokens = np.array(input_token_counts, dtype=float) + prompt_token_count
+ output_token_counts = np.array(seq_token_counts, dtype=float) - np.array(input_token_counts, dtype=float)
+
+ per_prompt_inputs.append(np.asarray(prompt_input_tokens, dtype=float))
+ per_prompt_outputs.append(output_token_counts)
+
+ inputs_array = np.vstack(per_prompt_inputs)
+ outputs_array = np.vstack(per_prompt_outputs)
- if return_agg_scores:
- agg_scores = [np.nanmean(s).item() for s in scores]
- return agg_scores
+ agg_input_tokens = np.nanmean(inputs_array, axis=1)
+ agg_output_tokens = np.nanmean(outputs_array, axis=1)
- return scores if not return_seq else (scores, seqs)
+ return inputs_array, outputs_array, agg_input_tokens, agg_output_tokens
@abstractmethod
- def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> List[float]:
+ def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
"""Abstract method to calculate the score for a predictions.
This method should be implemented by subclasses based on their specific evaluation logic.
"""
raise NotImplementedError
- @overload
- def evaluate(
- self,
- prompts: List[Prompt],
- predictor: "BasePredictor",
- system_prompts: Optional[Union[str, List[str]]] = None,
- return_agg_scores: Literal[True] = True,
- return_seq: Literal[False] = False,
- eval_strategy: Optional["EvalStrategy"] = None,
- ) -> List[float]:
- ...
-
- @overload
- def evaluate(
- self,
- prompts: List[Prompt],
- predictor: "BasePredictor",
- system_prompts: Optional[Union[str, List[str]]] = None,
- return_agg_scores: Literal[False] = False,
- return_seq: Literal[False] = False,
- eval_strategy: Optional["EvalStrategy"] = None,
- ) -> List[List[float]]:
- ...
-
- @overload
- def evaluate(
- self,
- prompts: List[Prompt],
- predictor: "BasePredictor",
- system_prompts: Optional[Union[str, List[str]]] = None,
- return_agg_scores: Literal[False] = False,
- return_seq: Literal[True] = True,
- eval_strategy: Optional["EvalStrategy"] = None,
- ) -> Tuple[List[List[float]], List[List[str]]]:
- ...
-
- @overload
- def evaluate(
- self,
- prompts: Prompt,
- predictor: "BasePredictor",
- system_prompts: Optional[Union[str, List[str]]] = None,
- return_agg_scores: Literal[True] = True,
- return_seq: Literal[False] = False,
- eval_strategy: Optional["EvalStrategy"] = None,
- ) -> List[float]:
- ...
-
- @overload
- def evaluate(
- self,
- prompts: Prompt,
- predictor: "BasePredictor",
- system_prompts: Optional[Union[str, List[str]]] = None,
- return_agg_scores: Literal[False] = False,
- return_seq: Literal[False] = False,
- eval_strategy: Optional["EvalStrategy"] = None,
- ) -> List[List[float]]:
- ...
-
- @overload
- def evaluate(
- self,
- prompts: Prompt,
- predictor: "BasePredictor",
- system_prompts: Optional[Union[str, List[str]]] = None,
- return_agg_scores: Literal[False] = False,
- return_seq: Literal[True] = True,
- eval_strategy: Optional["EvalStrategy"] = None,
- ) -> Tuple[List[List[float]], List[List[str]]]:
- ...
+ def activate_scalarized_objective(self) -> None:
+ """Activate scalarized objective for multi-objective tasks."""
+ raise NotImplementedError
def evaluate(
self,
prompts: Union[Prompt, List[Prompt]],
predictor: "BasePredictor",
system_prompts: Optional[Union[str, List[str]]] = None,
- return_agg_scores: bool = True,
- return_seq: bool = False,
eval_strategy: Optional["EvalStrategy"] = None,
- ) -> Union[List[float], List[List[float]], Tuple[List[List[float]], List[List[str]]]]:
+ block_idx: int | list[int] | None = None,
+ ) -> EvalResult:
"""Evaluate a set of prompts using a given predictor.
This method orchestrates subsampling, prediction, caching, and result collection.
+ Sequences, token costs, raw scores, and aggregated scores are always returned.
- Note: Cannot return both aggregated scores and sequences (assertion will fail).
+ Args:
+ prompts (Union[Prompt, List[Prompt]]): A single prompt or a list of prompts to evaluate. Results will be returned in the same order.
+ predictor (BasePredictor): The predictor to evaluate the prompts with.
+ system_prompts (Optional[Union[str, List[str]]], optional): Optional system prompts to parse to the predictor.
+ eval_strategy (Optional[EvalStrategy], optional): Subsampling strategy to use instead of self.eval_strategy. Defaults to None, which uses self.eval_strategy.
+ block_idx (Optional[int | list[int]], optional): Specific block index or indices to evaluate, overriding eval_strategy. Defaults to None.
"""
- assert not (return_agg_scores and return_seq), "Cannot return both aggregated scores and sequences"
-
- seqs: List[str] = []
-
- prompts = [prompts] if isinstance(prompts, Prompt) else prompts
+ prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
eval_strategy = eval_strategy or self.eval_strategy
- xs, ys = self.subsample(eval_strategy=eval_strategy)
- batches = self._prepare_batch(prompts, xs, ys, eval_strategy=eval_strategy)
- (prompts_to_evaluate, xs_to_evaluate, ys_to_evaluate) = ([], [], []) if not batches else zip(*batches)
-
- if prompts_to_evaluate:
- preds_seqs = predictor.predict(
- prompts=list(prompts_to_evaluate),
- xs=list(xs_to_evaluate),
- system_prompts=system_prompts,
- return_seq=return_seq,
- )
- else:
- preds_seqs = ([], []) if return_seq else []
- if return_seq:
- preds, seqs = preds_seqs if isinstance(preds_seqs, tuple) else (preds_seqs, [])
- else:
- preds = preds_seqs
+ if block_idx is not None and isinstance(block_idx, int):
+ block_idx = [block_idx]
+
+ xs, ys = self.subsample(eval_strategy=eval_strategy, block_idx=block_idx)
+ (
+ prompts_to_evaluate,
+ xs_to_evaluate,
+ ys_to_evaluate,
+ cache_keys,
+ ) = self._prepare_batch(prompts_list, xs, ys, eval_strategy=eval_strategy)
+
+ preds, pred_seqs = predictor.predict(
+ prompts=prompts_to_evaluate,
+ xs=xs_to_evaluate,
+ system_prompts=system_prompts,
+ )
- scores: List[float] = self._evaluate(list(xs_to_evaluate), list(ys_to_evaluate), preds)
- for i, cache_key in enumerate(batches):
+ scores = self._evaluate(xs_to_evaluate, ys_to_evaluate, preds)
+ for i, cache_key in enumerate(cache_keys):
self.eval_cache[cache_key] = scores[i]
- if return_seq:
- self.seq_cache[cache_key] = seqs[i]
+ self.seq_cache[cache_key] = str(pred_seqs[i])
- return self._collect_results_from_cache(
- prompts,
+ scores, agg_scores, seqs = self._collect_results_from_cache(
+ prompts_list,
xs,
ys,
- return_agg_scores,
- return_seq,
+ )
+
+ # Record evaluated block for block strategies
+ for prompt in prompts_list:
+ if block_idx is not None:
+ self.prompt_evaluated_blocks.setdefault(prompt, []).extend(block_idx)
+ elif eval_strategy in ["sequential_block", "random_block"]:
+ # Handle case where self.block_idx is a list
+ if isinstance(self.block_idx, list):
+ self.prompt_evaluated_blocks.setdefault(prompt, []).extend(self.block_idx)
+ else:
+ self.prompt_evaluated_blocks.setdefault(prompt, []).append(self.block_idx)
+ elif eval_strategy == "full":
+ self.prompt_evaluated_blocks.setdefault(prompt, []).extend(list(range(self.n_blocks)))
+
+ input_tokens, output_tokens, agg_input_tokens, agg_output_tokens = self._compute_costs(
+ prompts_list, xs, ys, predictor
+ )
+
+ return EvalResult(
+ scores=scores,
+ agg_scores=agg_scores,
+ sequences=seqs,
+ input_tokens=input_tokens,
+ output_tokens=output_tokens,
+ agg_input_tokens=agg_input_tokens,
+ agg_output_tokens=agg_output_tokens,
)
def pop_datapoints(self, n: Optional[int] = None, frac: Optional[float] = None) -> pd.DataFrame:
@@ -341,6 +387,7 @@ def increment_block_idx(self) -> None:
"""
if "block" not in self.eval_strategy:
raise ValueError("Block increment is only valid for block subsampling.")
+ assert isinstance(self.block_idx, int), "Block index must be an integer to increment."
self.block_idx += 1
if self.n_blocks > 0: # Ensure n_blocks is not zero to avoid division by zero
self.block_idx %= self.n_blocks
@@ -356,3 +403,17 @@ def reset_block_idx(self) -> None:
if "block" not in self.eval_strategy:
raise ValueError("Block reset is only valid for block subsampling.")
self.block_idx = 0
+
+ def set_block_idx(self, idx: int) -> None:
+ """Set the block index (or indices) for block subsampling strategies."""
+ if "block" not in self.eval_strategy:
+ raise ValueError("Block assignment is only valid for block subsampling.")
+
+ assert isinstance(idx, int), "Block index must be an integer"
+
+ self.block_idx = idx
+
+ def get_evaluated_blocks(self, prompts: Union[Prompt, List[Prompt]]) -> Dict[Prompt, List[int]]:
+ """Return mapping of prompt string to evaluated block indices."""
+ prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
+ return {p: list(self.prompt_evaluated_blocks.get(p, [])) for p in prompts_list}
diff --git a/promptolution/tasks/classification_tasks.py b/promptolution/tasks/classification_tasks.py
index e34c24f..80ebc9c 100644
--- a/promptolution/tasks/classification_tasks.py
+++ b/promptolution/tasks/classification_tasks.py
@@ -62,14 +62,13 @@ def __init__(
seed=seed,
config=config,
)
+ self.task_type = "classification"
self.ys: List[str] = (
df[self.y_column].str.lower().values.tolist()
) # Ensure y values are lowercase for consistent comparison
self.classes = np.unique(self.ys)
- def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> List[float]:
+ def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
"""Calculate the score for a single prediction."""
- scores = []
- for pred, y in zip(preds, ys):
- scores.append(self.metric([y], [pred]))
- return scores
+ scores = [self.metric([y], [pred]) for pred, y in zip(preds, ys)]
+ return np.asarray(scores, dtype=float)
diff --git a/promptolution/tasks/judge_tasks.py b/promptolution/tasks/judge_tasks.py
index 2570458..26801ee 100644
--- a/promptolution/tasks/judge_tasks.py
+++ b/promptolution/tasks/judge_tasks.py
@@ -1,5 +1,6 @@
"""Module for judge tasks."""
+import numpy as np
import pandas as pd
from typing import TYPE_CHECKING, List, Optional
@@ -110,9 +111,10 @@ def __init__(
config=config,
)
self.judge_llm = judge_llm
+ self.task_type = "judge"
def _construct_judge_prompt(self, x: str, pred: str, y: Optional[str] = None) -> str:
- """Constructs the judge prompt based on whether ground truth is available."""
+ """Construct the judge prompt based on whether ground truth is available."""
if y is not None:
prompt = self.judge_prompt.replace("{ground_truth}", str(y))
else:
@@ -122,7 +124,7 @@ def _construct_judge_prompt(self, x: str, pred: str, y: Optional[str] = None) ->
prompt = prompt.replace("{task}", task_description).replace("{input}", x).replace("{prediction}", pred)
return prompt
- def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> List[float]:
+ def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
"""Calculate the score for a single prediction using the LLM judge."""
prompts: List[str] = []
for x, y, pred in zip(xs, ys, preds):
@@ -145,4 +147,4 @@ def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> List[floa
scores.append(score)
- return scores
+ return np.asarray(scores, dtype=float)
diff --git a/promptolution/tasks/multi_objective_task.py b/promptolution/tasks/multi_objective_task.py
new file mode 100644
index 0000000..a4844cd
--- /dev/null
+++ b/promptolution/tasks/multi_objective_task.py
@@ -0,0 +1,184 @@
+"""Multi-objective task wrapper that evaluates prompts across multiple tasks."""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+import numpy as np
+
+from typing import Dict, List, Optional, Tuple
+
+from promptolution.tasks.base_task import BaseTask, EvalResult, EvalStrategy
+from promptolution.utils.prompt import Prompt
+
+
+@dataclass
+class MultiObjectiveEvalResult:
+ """Container for per-task evaluation outputs in multi-objective runs."""
+
+ scores: List[np.ndarray]
+ agg_scores: List[np.ndarray]
+ sequences: np.ndarray
+ input_tokens: np.ndarray
+ output_tokens: np.ndarray
+ agg_input_tokens: np.ndarray
+ agg_output_tokens: np.ndarray
+
+
+class MultiObjectiveTask(BaseTask):
+ """A task that aggregates evaluations across multiple underlying tasks."""
+
+ def __init__(
+ self,
+ tasks: List[BaseTask],
+ eval_strategy: Optional[EvalStrategy] = None,
+ ) -> None:
+ """Initialize with a list of tasks sharing subsampling and seed settings."""
+ if not tasks:
+ raise ValueError("tasks must be a non-empty list")
+
+ primary = tasks[0]
+ for t in tasks[1:]:
+ assert t.n_subsamples == primary.n_subsamples, "All tasks must share n_subsamples"
+ assert t.seed == primary.seed, "All tasks must share seed"
+ assert t.eval_strategy == primary.eval_strategy, "All tasks must share eval_strategy"
+
+ combined_description = "This task is a combination of the following tasks:\n" + "\n".join(
+ [f"Task: {t.task_description}" for t in tasks if t.task_description]
+ )
+
+ super().__init__(
+ df=primary.df,
+ x_column=primary.x_column,
+ y_column=primary.y_column,
+ task_description=combined_description,
+ n_subsamples=primary.n_subsamples,
+ eval_strategy=eval_strategy or primary.eval_strategy,
+ seed=primary.seed,
+ config=None,
+ )
+ self.task_type = "multi"
+ self.tasks = tasks
+ self._scalarized_objective: bool = False
+
+ def activate_scalarized_objective(self) -> None:
+ """Force single-objective behavior by equally averaging task scores."""
+ self._scalarized_objective = True
+
+ def evaluate( # type: ignore
+ self,
+ prompts: Prompt | List[Prompt],
+ predictor,
+ system_prompts: Optional[str | List[str]] = None,
+ eval_strategy: Optional[EvalStrategy] = None,
+ ) -> MultiObjectiveEvalResult | EvalResult:
+ """Run prediction once, then score via each task's _evaluate."""
+ prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
+ strategy = eval_strategy or self.eval_strategy
+
+ # Keep block alignment across tasks so block-based strategies stay in sync.
+ for task in self.tasks:
+ task.block_idx = self.block_idx
+
+ xs, ys = self.subsample(eval_strategy=strategy)
+
+ # Collect all uncached prompt/x/y triples across tasks to predict only once.
+ prompts_to_evaluate: List[str] = []
+ xs_to_evaluate: List[str] = []
+ ys_to_evaluate: List[str] = []
+ key_to_index: Dict[Tuple[str, str, str], int] = {}
+ cache_keys: List[Tuple[str, str, str]] = []
+
+ for task in self.tasks:
+ t_prompts, t_xs, t_ys, t_keys = task._prepare_batch(prompts_list, xs, ys, eval_strategy=strategy)
+ for prompt_str, x_val, y_val, key in zip(t_prompts, t_xs, t_ys, t_keys):
+ if key in key_to_index:
+ continue
+ key_to_index[key] = len(prompts_to_evaluate)
+ prompts_to_evaluate.append(prompt_str)
+ xs_to_evaluate.append(x_val)
+ ys_to_evaluate.append(y_val)
+ cache_keys.append(key)
+
+ preds: List[str] = []
+ pred_seqs: List[str] = []
+ if prompts_to_evaluate:
+ preds, pred_seqs = predictor.predict(
+ prompts=prompts_to_evaluate,
+ xs=xs_to_evaluate,
+ system_prompts=system_prompts,
+ )
+
+ # Map predictions back to each task and populate caches via _evaluate.
+ key_to_pred: Dict[Tuple[str, str, str], Tuple[str, str]] = {
+ key: (preds[idx], pred_seqs[idx]) for key, idx in key_to_index.items()
+ }
+
+ per_task_results: List[EvalResult] = []
+ for task in self.tasks:
+ if cache_keys:
+ xs_eval = [k[1] for k in cache_keys]
+ ys_eval = [k[2] for k in cache_keys]
+ preds_eval = [key_to_pred[k][0] for k in cache_keys]
+ scores = task._evaluate(xs_eval, ys_eval, preds_eval)
+ for score, cache_key in zip(scores, cache_keys):
+ task.eval_cache[cache_key] = score
+ task.seq_cache[cache_key] = key_to_pred[cache_key][1]
+
+ scores_array, agg_scores, seqs = task._collect_results_from_cache(prompts_list, xs, ys)
+ input_tokens, output_tokens, agg_input_tokens, agg_output_tokens = task._compute_costs(
+ prompts_list, xs, ys, predictor
+ )
+
+ per_task_results.append(
+ EvalResult(
+ scores=scores_array,
+ agg_scores=agg_scores,
+ sequences=seqs,
+ input_tokens=input_tokens,
+ output_tokens=output_tokens,
+ agg_input_tokens=agg_input_tokens,
+ agg_output_tokens=agg_output_tokens,
+ )
+ )
+
+ stacked_scores = [r.scores for r in per_task_results]
+ stacked_agg_scores = [r.agg_scores for r in per_task_results]
+
+ # Record evaluated blocks for this evaluation (mirroring BaseTask behavior)
+ for prompt in prompts_list:
+ # Use self.block_idx (the MultiObjectiveTask's block_idx) if in a block strategy
+ if strategy in ["sequential_block", "random_block"]:
+ if isinstance(self.block_idx, list):
+ self.prompt_evaluated_blocks.setdefault(prompt, []).extend(self.block_idx)
+ else:
+ self.prompt_evaluated_blocks.setdefault(prompt, []).append(self.block_idx)
+ elif strategy == "full":
+ self.prompt_evaluated_blocks.setdefault(prompt, []).extend(list(range(self.n_blocks)))
+
+ # Use first task's result for sequences and token counts (they're all the same across tasks)
+ first_result = per_task_results[0]
+
+ if self._scalarized_objective:
+ return EvalResult(
+ scores=np.mean(stacked_scores, axis=0),
+ agg_scores=np.mean(stacked_agg_scores, axis=0),
+ sequences=first_result.sequences,
+ input_tokens=first_result.input_tokens,
+ output_tokens=first_result.output_tokens,
+ agg_input_tokens=first_result.agg_input_tokens,
+ agg_output_tokens=first_result.agg_output_tokens,
+ )
+
+ return MultiObjectiveEvalResult(
+ scores=stacked_scores,
+ agg_scores=stacked_agg_scores,
+ sequences=first_result.sequences,
+ input_tokens=first_result.input_tokens,
+ output_tokens=first_result.output_tokens,
+ agg_input_tokens=first_result.agg_input_tokens,
+ agg_output_tokens=first_result.agg_output_tokens,
+ )
+
+ def _evaluate(self, xs, ys, preds): # pragma: no cover
+ raise NotImplementedError("MultiObjectiveTask overrides evaluate directly")
diff --git a/promptolution/tasks/reward_tasks.py b/promptolution/tasks/reward_tasks.py
index cf92ed0..cb4f922 100644
--- a/promptolution/tasks/reward_tasks.py
+++ b/promptolution/tasks/reward_tasks.py
@@ -1,6 +1,9 @@
"""Module for Reward tasks."""
+from collections import defaultdict
+
+import numpy as np
import pandas as pd
from typing import TYPE_CHECKING, Callable, List, Optional
@@ -24,6 +27,8 @@ def __init__(
df: pd.DataFrame,
reward_function: Callable[[str], float],
x_column: str = "x",
+ y_column: Optional[str] = None,
+ reward_columns: Optional[List[str]] = None,
task_description: Optional[str] = None,
n_subsamples: int = 30,
eval_strategy: "EvalStrategy" = "full",
@@ -34,8 +39,10 @@ def __init__(
Args:
df (pd.DataFrame): Input DataFrame containing the data.
- reward_function (Callable): Function that takes a prediction and returns a reward score. Note: The optimizers aim to maximize.
+ reward_function (Callable): Function that takes a prediction, potential keyword arguments from the dataframe, and returns a reward score. Note: The optimizers aim to maximize.
x_column (str, optional): Name of the column containing input texts. Defaults to "x".
+ y_column (str, optional): Name of the column containing target texts if available. Defaults to None.
+ reward_columns (List[str], optional): Additional dataframe columns to pass as keyword args to reward_function.
task_description (str, optional): Description of the task.
n_subsamples (int, optional): Number of subsamples to use. Defaults to 30.
eval_strategy (str, optional): Subsampling strategy to use. Defaults to "full".
@@ -43,17 +50,24 @@ def __init__(
config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
"""
self.reward_function = reward_function
+ self.reward_columns = reward_columns or []
super().__init__(
df=df,
x_column=x_column,
+ y_column=y_column,
task_description=task_description,
n_subsamples=n_subsamples,
eval_strategy=eval_strategy,
seed=seed,
config=config,
)
+ self.task_type = "reward"
+ # x -> kwargs to reward function
+ km = self.df.set_index(x_column)[self.reward_columns].to_dict("index")
+ self.kwargs_map = defaultdict(dict, km)
- def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> List[float]:
- """Calculate the score for a single reward prediction using the reward function."""
- rewards = [self.reward_function(pred) for pred in preds]
- return rewards
+ def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
+ """Calculate reward for each prediction, passing configured columns as kwargs."""
+ kwargs_list = [self.kwargs_map[x] for x in xs]
+ rewards = [self.reward_function(pred, **kwargs) for pred, kwargs in zip(preds, kwargs_list)]
+ return np.asarray(rewards, dtype=float)
diff --git a/promptolution/utils/callbacks.py b/promptolution/utils/callbacks.py
index 98129e2..abad62f 100644
--- a/promptolution/utils/callbacks.py
+++ b/promptolution/utils/callbacks.py
@@ -34,7 +34,7 @@ def __init__(self, **kwargs: Any) -> None:
pass
def on_step_end(self, optimizer: "BaseOptimizer") -> bool:
- """Called at the end of each optimization step.
+ """Call at the end of each optimization step.
Args:
optimizer: The optimizer object that called the callback.
@@ -45,7 +45,7 @@ def on_step_end(self, optimizer: "BaseOptimizer") -> bool:
return True
def on_epoch_end(self, optimizer: "BaseOptimizer") -> bool:
- """Called at the end of each optimization epoch.
+ """Call at the end of each optimization epoch.
Args:
optimizer: The optimizer object that called the callback.
@@ -56,7 +56,7 @@ def on_epoch_end(self, optimizer: "BaseOptimizer") -> bool:
return True
def on_train_end(self, optimizer: "BaseOptimizer") -> bool:
- """Called at the end of the entire optimization process.
+ """Call at the end of the entire optimization process.
Args:
optimizer: The optimizer object that called the callback.
diff --git a/promptolution/utils/capo_utils.py b/promptolution/utils/capo_utils.py
new file mode 100644
index 0000000..40ee707
--- /dev/null
+++ b/promptolution/utils/capo_utils.py
@@ -0,0 +1,116 @@
+"""Shared utilities for CAPO-style optimizers."""
+
+from __future__ import annotations
+
+import random
+
+from typing import Callable, List, Optional
+
+from promptolution.utils.formatting import extract_from_tag
+from promptolution.utils.prompt import Prompt
+from promptolution.utils.templates import CAPO_FEWSHOT_TEMPLATE
+
+
+def build_few_shot_examples(
+ instruction: str,
+ num_examples: int,
+ optimizer,
+) -> List[str]:
+ """Create few-shot examples with optional reasoning replacement."""
+ if num_examples == 0:
+ return []
+
+ few_shot_samples = optimizer.df_few_shots.sample(num_examples, replace=False)
+ sample_inputs = few_shot_samples[optimizer.task.x_column].values.astype(str)
+ sample_targets = few_shot_samples[optimizer.task.y_column].values
+ few_shots = [
+ CAPO_FEWSHOT_TEMPLATE.replace(" ", i).replace(
+ "", f"{optimizer.target_begin_marker}{t}{optimizer.target_end_marker}"
+ )
+ for i, t in zip(sample_inputs, sample_targets)
+ ]
+
+ if not optimizer.create_fs_reasoning:
+ return few_shots
+
+ preds, seqs = optimizer.predictor.predict(
+ [instruction] * num_examples,
+ list(sample_inputs),
+ )
+ if isinstance(seqs, str):
+ seqs = [seqs]
+ if isinstance(preds, str):
+ preds = [preds]
+
+ for j in range(num_examples):
+ seqs[j] = seqs[j].replace(sample_inputs[j], "", 1).strip()
+ if preds[j] == sample_targets[j] or not optimizer.check_fs_accuracy:
+ few_shots[j] = CAPO_FEWSHOT_TEMPLATE.replace(" ", sample_inputs[j]).replace("", seqs[j])
+
+ return few_shots
+
+
+def perform_crossover(
+ parents: List[Prompt],
+ optimizer,
+ parent_select_func: Optional[Callable] = None,
+) -> List[Prompt]:
+ """Generate crossover offspring prompts."""
+ crossover_prompts: List[str] = []
+ offspring_few_shots: List[List[str]] = []
+ for _ in range(optimizer.crossovers_per_iter):
+ if parent_select_func:
+ mother, father = parent_select_func(parents)
+ else:
+ mother, father = random.sample(parents, 2)
+ crossover_prompt = (
+ optimizer.crossover_template.replace("", mother.instruction)
+ .replace("", father.instruction)
+ .strip()
+ )
+ crossover_prompts.append(crossover_prompt)
+ combined_few_shots = mother.few_shots + father.few_shots
+ num_few_shots = (len(mother.few_shots) + len(father.few_shots)) // 2
+ offspring_few_shot = random.sample(combined_few_shots, num_few_shots) if combined_few_shots else []
+ offspring_few_shots.append(offspring_few_shot)
+
+ child_instructions = optimizer.meta_llm.get_response(crossover_prompts)
+ return [
+ Prompt(extract_from_tag(instr, "", " "), examples)
+ for instr, examples in zip(child_instructions, offspring_few_shots)
+ ]
+
+
+def perform_mutation(
+ offsprings: List[Prompt],
+ optimizer,
+) -> List[Prompt]:
+ """Mutate offspring prompts."""
+ mutation_template = optimizer.mutation_template
+ meta_llm = optimizer.meta_llm
+ upper_shots = optimizer.upper_shots
+
+ mutation_prompts = [mutation_template.replace("", prompt.instruction) for prompt in offsprings]
+ new_instructions = meta_llm.get_response(mutation_prompts)
+
+ mutated: List[Prompt] = []
+ for new_instruction, prompt in zip(new_instructions, offsprings):
+ new_instruction = extract_from_tag(new_instruction, "", " ")
+ p = random.random()
+
+ if p < 1 / 3 and len(prompt.few_shots) < upper_shots:
+ new_few_shot = build_few_shot_examples(
+ instruction=new_instruction,
+ num_examples=1,
+ optimizer=optimizer,
+ )
+ new_few_shots = prompt.few_shots + new_few_shot
+ elif 1 / 3 <= p < 2 / 3 and len(prompt.few_shots) > 0:
+ new_few_shots = random.sample(prompt.few_shots, len(prompt.few_shots) - 1)
+ else:
+ new_few_shots = prompt.few_shots
+
+ random.shuffle(new_few_shots)
+ mutated.append(Prompt(new_instruction, new_few_shots))
+
+ return mutated
diff --git a/promptolution/utils/formatting.py b/promptolution/utils/formatting.py
index 3d2ad19..c342baf 100644
--- a/promptolution/utils/formatting.py
+++ b/promptolution/utils/formatting.py
@@ -13,7 +13,7 @@ def extract_from_tag(text: List[str], start_tag: str, end_tag: str) -> List[str]
def extract_from_tag(text: Union[str, List[str]], start_tag: str, end_tag: str) -> Union[List[str], str]:
- """Extracts content from a string between specified start and end tags.
+ """Extract content from a string between specified start and end tags.
Args:
text (str): The input text to extract from.
diff --git a/promptolution/utils/prompt.py b/promptolution/utils/prompt.py
index d660e49..641e0dc 100644
--- a/promptolution/utils/prompt.py
+++ b/promptolution/utils/prompt.py
@@ -1,17 +1,19 @@
"""Module defining the Prompt class and related utilities."""
-from typing import List, Optional, Tuple
+import numpy as np
+
+from typing import List, Optional, Sequence, Tuple, Union
from promptolution.utils.templates import DOWNSTREAM_TEMPLATE, DOWNSTREAM_TEMPLATE_W_FEWSHOTS
class Prompt:
- """Represents a prompt consisting of an instruction and few-shot examples."""
+ """Represent a prompt consisting of an instruction and few-shot examples."""
def __init__(
self, instruction: str, few_shots: Optional[List[str]] = None, downstream_template: Optional[str] = None
) -> None:
- """Initializes the Prompt with an instruction and associated examples.
+ """Initialize the Prompt with an instruction and associated examples.
Args:
instruction (str): The instruction or prompt text.
@@ -28,7 +30,7 @@ def __init__(
self.downstream_template = downstream_template
def construct_prompt(self) -> str:
- """Constructs the full prompt string by replacing placeholders in the template with the instruction and formatted examples.
+ """Construct the full prompt string by replacing placeholders in the template with the instruction and formatted examples.
Returns:
str: The constructed prompt string.
@@ -43,30 +45,59 @@ def construct_prompt(self) -> str:
return prompt
def __str__(self) -> str:
- """Returns the string representation of the prompt."""
+ """Return the string representation of the prompt."""
return self.construct_prompt()
+ def __eq__(self, other: object) -> bool:
+ """Structural equality for use in lists, sets, and dict keys."""
+ if not isinstance(other, Prompt):
+ return False
+ return (
+ self.instruction == other.instruction
+ and self.few_shots == other.few_shots
+ and self.downstream_template == other.downstream_template
+ )
+
+ def __hash__(self) -> int:
+ """Hash function for use in sets and dict keys."""
+ return hash((self.instruction, tuple(self.few_shots), self.downstream_template))
+
def sort_prompts_by_scores(
- prompts: List[Prompt], scores: List[float], top_k: Optional[int] = None
+ prompts: List[Prompt], scores: Union[Sequence[float], np.ndarray], top_k: Optional[int] = None
) -> Tuple[List[Prompt], List[float]]:
- """Sorts prompts based on their associated scores in descending order.
+ """Sort prompts by score, accepting scalar, 1D, or multi-dimensional scores.
+
+ Scores can be provided as Python lists or NumPy arrays. If scores are multi-
+ dimensional (e.g., per-subsample results), they are aggregated with a
+ ``nanmean`` across all non-leading axes before sorting.
Args:
- prompts (List[Prompt]): List of Prompt objects.
- scores (List[float]): Corresponding list of scores.
- top_k (Optional[int]): If provided, limits the result to the top_k prompts. Defaults to None (returns all).
+ prompts (List[Prompt]): Prompt objects to sort.
+ scores (Sequence[float] | np.ndarray): Corresponding scores; can be nested lists or arrays.
+ top_k (Optional[int]): Limit the result to the top_k prompts.
Returns:
- Tuple[List[Prompt], List[float]]: A tuple containing prompts sorted by scores in descending order and their corresponding sorted scores.
+ Tuple[List[Prompt], List[float]]: Prompts and their aggregated scores,
+ sorted in descending order.
"""
- assert len(prompts) == len(scores), "Prompts and scores must have the same length."
+ scores_arr = np.asarray(scores, dtype=float)
+ if scores_arr.ndim == 0:
+ scores_arr = scores_arr.reshape(1)
+
+ assert scores_arr.shape[0] == len(prompts), "Prompts and scores must have the same length."
- sorted_prompts = [prompt for score, prompt in sorted(zip(scores, prompts), reverse=True, key=lambda x: x[0])]
- sorted_scores = sorted(scores, reverse=True)
+ if scores_arr.ndim > 1:
+ axes_to_reduce = tuple(range(1, scores_arr.ndim))
+ scores_arr = np.nanmean(scores_arr, axis=axes_to_reduce)
+
+ prompt_score_pairs = list(zip(prompts, scores_arr.tolist()))
+ prompt_score_pairs.sort(key=lambda pair: pair[1], reverse=True)
if top_k is not None:
- sorted_prompts = sorted_prompts[:top_k]
- sorted_scores = sorted_scores[:top_k]
+ prompt_score_pairs = prompt_score_pairs[:top_k]
+
+ sorted_prompts = [p for p, _ in prompt_score_pairs]
+ sorted_scores = [s for _, s in prompt_score_pairs]
return sorted_prompts, sorted_scores
diff --git a/promptolution/utils/prompt_creation.py b/promptolution/utils/prompt_creation.py
index fd0087d..0f0cff3 100644
--- a/promptolution/utils/prompt_creation.py
+++ b/promptolution/utils/prompt_creation.py
@@ -13,7 +13,6 @@
from promptolution.llms.base_llm import BaseLLM
from promptolution.tasks.base_task import BaseTask
-from promptolution.tasks.classification_tasks import ClassificationTask
from promptolution.utils.templates import (
PROMPT_CREATION_TEMPLATE,
PROMPT_CREATION_TEMPLATE_FROM_TASK_DESCRIPTION,
@@ -95,7 +94,7 @@ def create_prompts_from_samples(
meta_prompts = []
for _ in range(n_prompts):
- if isinstance(task, ClassificationTask) and get_uniform_labels:
+ if task.task_type == "classification" and get_uniform_labels:
# if classification task sample such that all classes are represented
unique_labels, counts = np.unique(task.ys, return_counts=True)
proportions = counts / len(task.ys)
diff --git a/promptolution/utils/token_counter.py b/promptolution/utils/token_counter.py
index 422e277..75a6d40 100644
--- a/promptolution/utils/token_counter.py
+++ b/promptolution/utils/token_counter.py
@@ -13,7 +13,7 @@
logger = get_logger(__name__)
-def get_token_counter(llm: "BaseLLM") -> Callable[[str], int]:
+def get_token_counter(llm: "BaseLLM") -> Callable[[str], float]:
"""Get a token counter function for the given LLM.
This function returns a callable that counts tokens based on the LLM's tokenizer
@@ -28,7 +28,7 @@ def get_token_counter(llm: "BaseLLM") -> Callable[[str], int]:
"""
if llm.tokenizer is not None:
tokenizer: "PreTrainedTokenizer" = llm.tokenizer
- return lambda x: len(tokenizer.encode(x))
+ return lambda x: float(len(tokenizer.encode(x)))
else:
logger.warning("β οΈ The LLM does not have a tokenizer. Using simple token count.")
- return lambda x: len(x.split())
+ return lambda x: float(len(x.split()))
diff --git a/pyproject.toml b/pyproject.toml
index 487b398..5b45f40 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -14,7 +14,7 @@ scikit-learn = ">=1.5.2"
fastparquet = ">=2024.11.0"
openai = {version = ">=1.0.0", optional = true}
requests = {version = ">=2.31.0", optional = true}
-vllm = {version = ">=0.10.1.1", optional = true}
+vllm = {version = ">=0.13.0", optional = true}
transformers = {version = ">=4.48.0", optional = true}
scipy = ">=1.15"
@@ -32,7 +32,7 @@ requests = ">=2.31.0"
[tool.poetry.group.vllm]
optional = true
[tool.poetry.group.vllm.dependencies]
-vllm = ">=0.10.1.1"
+vllm = ">=0.13.0"
[tool.poetry.group.transformers]
optional = true
@@ -52,7 +52,7 @@ pytest = ">=8.3.5"
pytest-cov = ">=6.1.1"
openai = ">=1.0.0"
requests = ">=2.31.0"
-vllm = "==0.10.1.1"
+vllm = ">=0.13.0"
transformers = ">=4.48.0"
[tool.poetry.group.docs.dependencies]
diff --git a/tests/conftest.py b/tests/conftest.py
index 2ba60f8..d4499c5 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -104,7 +104,7 @@ def mock_classification_task_with_subsampling(mock_df):
@pytest.fixture
def simple_reward_function():
- """A simple reward function for testing RewardTask."""
+ """Define a simple reward function for testing RewardTask."""
def reward_func(prediction: str) -> float:
if "great" in prediction.lower() or "perfect" in prediction.lower():
diff --git a/tests/helpers/test_helpers.py b/tests/helpers/test_helpers.py
index d39ec38..d90e1ea 100644
--- a/tests/helpers/test_helpers.py
+++ b/tests/helpers/test_helpers.py
@@ -8,7 +8,28 @@
from tests.mocks.mock_predictor import MockPredictor
from tests.mocks.mock_task import MockTask
-from promptolution.helpers import run_evaluation, run_experiment, run_optimization
+from promptolution.exemplar_selectors.random_search_selector import RandomSearchSelector
+from promptolution.exemplar_selectors.random_selector import RandomSelector
+from promptolution.helpers import (
+ get_exemplar_selector,
+ get_llm,
+ get_optimizer,
+ get_predictor,
+ get_task,
+ run_evaluation,
+ run_experiment,
+ run_optimization,
+)
+from promptolution.optimizers.capo import CAPO
+from promptolution.optimizers.evoprompt_de import EvoPromptDE
+from promptolution.optimizers.evoprompt_ga import EvoPromptGA
+from promptolution.optimizers.opro import OPRO
+from promptolution.predictors.first_occurrence_predictor import FirstOccurrencePredictor
+from promptolution.predictors.maker_based_predictor import MarkerBasedPredictor
+from promptolution.tasks.base_task import EvalResult
+from promptolution.tasks.classification_tasks import ClassificationTask
+from promptolution.tasks.judge_tasks import JudgeTask
+from promptolution.tasks.reward_tasks import RewardTask
from promptolution.utils import ExperimentConfig
from promptolution.utils.prompt import Prompt
@@ -200,7 +221,15 @@ def test_run_evaluation(mock_get_task, mock_get_predictor, mock_get_llm, sample_
prompts = [Prompt(p) for p in prompts]
# Now this will work because mock_task is a MagicMock
- mock_task.evaluate.return_value = np.array([0.8, 0.7, 0.9])
+ mock_task.evaluate.return_value = EvalResult(
+ scores=np.array([[0.9], [0.8], [0.7]], dtype=float),
+ agg_scores=np.array([0.9, 0.8, 0.7], dtype=float),
+ sequences=np.array([["s1"], ["s2"], ["s3"]], dtype=object),
+ input_tokens=np.array([[10.0], [10.0], [10.0]], dtype=float),
+ output_tokens=np.array([[5.0], [5.0], [5.0]], dtype=float),
+ agg_input_tokens=np.array([10.0, 10.0, 10.0], dtype=float),
+ agg_output_tokens=np.array([5.0, 5.0, 5.0], dtype=float),
+ )
# Run the function
result = run_evaluation(sample_df, experiment_config, prompts)
@@ -279,7 +308,17 @@ def test_helpers_integration(sample_df, experiment_config):
# Use a MagicMock instead of MockTask
mock_task = MagicMock()
mock_task.classes = ["positive", "neutral", "negative"]
- mock_task.evaluate = MagicMock(return_value=np.array([0.85, 0.75]))
+ mock_task.evaluate = MagicMock(
+ return_value=EvalResult(
+ scores=np.array([[0.9], [0.8]], dtype=float),
+ agg_scores=np.array([0.9, 0.8], dtype=float),
+ sequences=np.array([["s1"], ["s2"]], dtype=object),
+ input_tokens=np.array([[10.0], [10.0]], dtype=float),
+ output_tokens=np.array([[5.0], [5.0]], dtype=float),
+ agg_input_tokens=np.array([10.0, 10.0], dtype=float),
+ agg_output_tokens=np.array([5.0, 5.0], dtype=float),
+ )
+ )
mock_optimizer = MagicMock()
@@ -308,3 +347,103 @@ def test_helpers_integration(sample_df, experiment_config):
# Verify evaluation was called
mock_task.evaluate.assert_called()
+
+
+def test_get_llm_variants(monkeypatch):
+ def factory(model_name=None, config=None, **kwargs):
+ created["name"] = model_name or kwargs.get("model_id")
+ created["config"] = config
+ return MockLLM()
+
+ created = {}
+
+ monkeypatch.setattr("promptolution.helpers.LocalLLM", factory)
+ monkeypatch.setattr("promptolution.helpers.VLLM", factory)
+ monkeypatch.setattr("promptolution.helpers.APILLM", factory)
+
+ cfg = ExperimentConfig()
+ cfg.model_id = "local-foo"
+ res = get_llm(config=cfg)
+ assert isinstance(res, MockLLM)
+ assert created["name"] == "foo"
+
+ cfg.model_id = "vllm-bar"
+ res = get_llm(config=cfg)
+ assert created["name"] == "bar"
+
+ cfg.model_id = "api-model"
+ res = get_llm(config=cfg)
+ assert created["name"] == "api-model"
+
+ with pytest.raises(ValueError):
+ get_llm()
+
+
+def test_get_task_variants(sample_df):
+ cfg = ExperimentConfig()
+ cfg.task_type = "reward"
+ task = get_task(sample_df, cfg, reward_function=lambda _: 1.0)
+
+ assert isinstance(task, RewardTask)
+
+ cfg.task_type = "judge"
+ judge_task = get_task(sample_df, cfg, judge_llm=MockLLM())
+
+ assert isinstance(judge_task, JudgeTask)
+
+ cfg.task_type = "classification"
+ cls_task = get_task(sample_df, cfg)
+
+ assert isinstance(cls_task, ClassificationTask)
+
+
+def test_get_optimizer_variants():
+ pred = MockPredictor(llm=MockLLM())
+ task = MockTask()
+ cfg = ExperimentConfig()
+
+ opt = get_optimizer(pred, MockLLM(), task, optimizer="capo", config=cfg)
+
+ assert isinstance(opt, CAPO)
+
+ opt3 = get_optimizer(pred, MockLLM(), task, optimizer="evopromptde", config=cfg)
+
+ assert isinstance(opt3, EvoPromptDE)
+
+ opt4 = get_optimizer(pred, MockLLM(), task, optimizer="evopromptga", config=cfg)
+
+ assert isinstance(opt4, EvoPromptGA)
+
+ opt5 = get_optimizer(pred, MockLLM(), task, optimizer="opro", config=cfg)
+
+ assert isinstance(opt5, OPRO)
+
+ with pytest.raises(ValueError):
+ get_optimizer(pred, MockLLM(), task, optimizer="unknown", config=cfg)
+
+
+def test_get_exemplar_selector_variants():
+ task = MockTask()
+ pred = MockPredictor()
+
+ sel = get_exemplar_selector("random", task, pred)
+ assert isinstance(sel, RandomSelector)
+
+ sel2 = get_exemplar_selector("random_search", task, pred)
+ assert isinstance(sel2, RandomSearchSelector)
+
+ with pytest.raises(ValueError):
+ get_exemplar_selector("nope", task, pred)
+
+
+def test_get_predictor_variants():
+ llm = MockLLM()
+
+ p1 = get_predictor(llm, type="first_occurrence", classes=["a", "b"])
+ assert isinstance(p1, FirstOccurrencePredictor)
+
+ p2 = get_predictor(llm, type="marker")
+ assert isinstance(p2, MarkerBasedPredictor)
+
+ with pytest.raises(ValueError):
+ get_predictor(llm, type="bad")
diff --git a/tests/llms/test_api_llm.py b/tests/llms/test_api_llm.py
index 2d1cb5a..cda8c74 100644
--- a/tests/llms/test_api_llm.py
+++ b/tests/llms/test_api_llm.py
@@ -1,8 +1,30 @@
-from unittest.mock import MagicMock, patch
+import asyncio
+from concurrent.futures import TimeoutError as FuturesTimeout
+from types import SimpleNamespace
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
from promptolution.llms import APILLM
+class _FakeSem:
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc, tb):
+ return False
+
+
+def _make_api_stub(**attrs):
+ """Create an APILLM instance via __new__ with provided attributes."""
+ api = APILLM.__new__(APILLM)
+ api._call_kwargs = {}
+ for key, value in attrs.items():
+ setattr(api, key, value)
+ return api
+
+
def test_api_llm_initialization():
"""Test that APILLM initializes correctly."""
# Create patches for all dependencies
@@ -34,3 +56,119 @@ def test_api_llm_initialization():
assert api_llm.api_url == "https://api.example.com"
assert api_llm.model_id == "gpt-4"
assert api_llm.max_concurrent_calls == 10
+
+
+def test_ainvoke_once_uses_client_and_timeout(monkeypatch):
+ response = SimpleNamespace(choices=[SimpleNamespace(message=SimpleNamespace(content="ok"))])
+ create = AsyncMock(return_value=response)
+ client = SimpleNamespace(chat=SimpleNamespace(completions=SimpleNamespace(create=create)))
+
+ api = _make_api_stub(model_id="m", max_tokens=11, call_timeout_s=0.5, _sem=_FakeSem(), client=client)
+
+ out = asyncio.run(api._ainvoke_once("prompt", "system"))
+
+ assert out is response
+ assert create.await_count == 1
+ kwargs = create.await_args.kwargs
+ assert kwargs["model"] == "m"
+ assert kwargs["messages"][0]["role"] == "system"
+ assert kwargs["max_tokens"] == 11
+
+
+def test_ainvoke_with_retries_recovers(monkeypatch):
+ good = SimpleNamespace(choices=[SimpleNamespace(message=SimpleNamespace(content="done"))])
+ api = _make_api_stub(max_retries=2, retry_base_delay_s=0)
+ api._ainvoke_once = AsyncMock(side_effect=[Exception("fail"), good])
+
+ async def _sleep(_):
+ return None
+
+ monkeypatch.setattr("promptolution.llms.api_llm.asyncio.sleep", _sleep)
+
+ out = asyncio.run(api._ainvoke_with_retries("p", "s"))
+
+ assert out == "done"
+ assert api._ainvoke_once.await_count == 2
+
+
+def test_ainvoke_with_retries_exhausts(monkeypatch):
+ api = _make_api_stub(max_retries=1, retry_base_delay_s=0)
+ api._ainvoke_once = AsyncMock(side_effect=[Exception("boom"), Exception("boom2")])
+
+ async def _sleep(_):
+ return None
+
+ monkeypatch.setattr("promptolution.llms.api_llm.asyncio.sleep", _sleep)
+
+ with pytest.raises(Exception) as excinfo:
+ asyncio.run(api._ainvoke_with_retries("p", "s"))
+
+ assert "boom2" in str(excinfo.value)
+ assert api._ainvoke_once.await_count == 2
+
+
+def test_aget_batch_success(monkeypatch):
+ api = _make_api_stub(gather_timeout_s=1)
+ api._ainvoke_with_retries = AsyncMock(side_effect=["a", "b"])
+ monkeypatch.setattr("promptolution.llms.api_llm.asyncio.wait_for", asyncio.wait_for)
+
+ outs = asyncio.run(api._aget_batch(["p1", "p2"], ["s1", "s2"]))
+
+ assert outs == ["a", "b"]
+ assert api._ainvoke_with_retries.await_count == 2
+
+
+def test_aget_batch_raises_on_failure(monkeypatch):
+ api = _make_api_stub(gather_timeout_s=1)
+ api._ainvoke_with_retries = AsyncMock(side_effect=["ok", Exception("boom")])
+ monkeypatch.setattr("promptolution.llms.api_llm.asyncio.wait_for", asyncio.wait_for)
+
+ with pytest.raises(RuntimeError):
+ asyncio.run(api._aget_batch(["p1", "p2"], ["s1", "s2"]))
+
+
+def test_get_response_success(monkeypatch):
+ api = _make_api_stub(gather_timeout_s=1)
+ api._aget_batch = AsyncMock()
+
+ class _Future:
+ def __init__(self, value):
+ self.value = value
+ self.cancelled = False
+
+ def result(self, timeout=None):
+ return self.value
+
+ def cancel(self):
+ self.cancelled = True
+
+ fut = _Future(["r1", "r2"])
+ api._submit = MagicMock(return_value=fut)
+
+ out = api._get_response(["p1", "p2"], ["s1", "s2"])
+
+ assert out == ["r1", "r2"]
+ api._submit.assert_called_once()
+ assert fut.cancelled is False
+
+
+def test_get_response_times_out():
+ api = _make_api_stub(gather_timeout_s=1)
+
+ class _Future:
+ def __init__(self):
+ self.cancelled = False
+
+ def result(self, timeout=None):
+ raise FuturesTimeout()
+
+ def cancel(self):
+ self.cancelled = True
+
+ fut = _Future()
+ api._submit = MagicMock(return_value=fut)
+
+ with pytest.raises(TimeoutError):
+ api._get_response(["p"], ["s"])
+
+ assert fut.cancelled is True
diff --git a/tests/llms/test_base_llm.py b/tests/llms/test_base_llm.py
new file mode 100644
index 0000000..0a3d774
--- /dev/null
+++ b/tests/llms/test_base_llm.py
@@ -0,0 +1,35 @@
+from tests.mocks.dummy_config import DummyConfig
+from tests.mocks.mock_llm import MockLLM
+
+
+def test_base_llm_token_count_and_reset():
+ llm = MockLLM()
+ llm.update_token_count(["a b"], ["c d e"])
+ counts = llm.get_token_count()
+ assert counts["input_tokens"] == 2
+ assert counts["output_tokens"] == 3
+
+ llm.reset_token_count()
+ assert llm.get_token_count()["total_tokens"] == 0
+
+
+def test_base_llm_default_and_list_system_prompts():
+ llm = MockLLM()
+ res_single = llm.get_response("hello")
+ assert res_single == ["Mock response for: hello"]
+
+ res_multi = llm.get_response(["p1", "p2"], system_prompts=["s1", "s2"])
+ assert res_multi == ["Mock response for: p1", "Mock response for: p2"]
+
+
+def test_base_llm_config_applied():
+ cfg = DummyConfig()
+ llm = MockLLM(predetermined_responses=["r1"], add_prompt_tags=False, config=cfg)
+ assert cfg.applied is True
+ assert getattr(llm, "applied") is True
+
+
+def test_base_llm_set_generation_seed():
+ llm = MockLLM()
+ llm.set_generation_seed(123)
+ assert llm._generation_seed == 123
diff --git a/tests/llms/test_vllm.py b/tests/llms/test_vllm.py
index 6eef031..84ae44f 100644
--- a/tests/llms/test_vllm.py
+++ b/tests/llms/test_vllm.py
@@ -42,6 +42,9 @@ def mock_generate_side_effect(prompts_list, *args, **kwargs):
# This is the most critical change.
mock_from_pretrained.return_value = mock_tokenizer_instance
+ # 4. Make sure llm_instance.get_tokenizer() returns the mock tokenizer
+ mock_llm_instance.get_tokenizer.return_value = mock_tokenizer_instance
+
# --- Sampling Params Mock Setup ---
mock_sampling_params_instance = MagicMock()
mock_sampling_params.return_value = mock_sampling_params_instance
@@ -87,13 +90,22 @@ def test_vllm_with_auto_batch_size(mock_vllm_dependencies):
mock_vllm_dependencies["llm_instance"].llm_engine.model_executor.cache_config.block_size = 16
# Create VLLM instance with batch_size=None to trigger auto calculation
- vllm_instance = VLLM(model_id="mock-model", batch_size=None, max_model_len=2048)
+ # With max_num_batched_tokens=16384 and max_model_len=2048:
+ # token_limited = 16384 // 2048 = 8
+ # batch_size = min(max_num_seqs=10, token_limited=8) = 8
+ vllm_instance = VLLM(
+ model_id="mock-model",
+ batch_size=None,
+ max_model_len=2048,
+ llm_kwargs={"max_num_seqs": 10, "max_num_batched_tokens": 16384},
+ )
# Verify batch_size is greater than zero
assert vllm_instance.batch_size > 0, "Batch size should be greater than zero"
- # With num_gpu_blocks=1000, block_size=16, max_model_len=2048
- # batch_size = int((1000 * 16 / 2048) * 0.95) = int(7.8125 * 0.95) = int(7.42) = 7
- assert vllm_instance.batch_size == 7, f"Expected batch_size=7, got {vllm_instance.batch_size}"
+ # With max_num_batched_tokens=16384, max_model_len=2048, max_num_seqs=10
+ # token_limited = 16384 // 2048 = 8
+ # batch_size = min(10, 8) = 8
+ assert vllm_instance.batch_size == 8, f"Expected batch_size=8, got {vllm_instance.batch_size}"
# Test with a single prompt
prompts = ["Test prompt"]
diff --git a/tests/mocks/dummy_config.py b/tests/mocks/dummy_config.py
new file mode 100644
index 0000000..cf0ac9e
--- /dev/null
+++ b/tests/mocks/dummy_config.py
@@ -0,0 +1,21 @@
+"""Lightweight config stub used across tests."""
+
+
+class DummyConfig:
+ """Minimal config object that tracks apply/validate calls."""
+
+ def __init__(self, task_description=None):
+ """Initialize the dummy config with an optional task description."""
+ self.applied = False
+ self.validated = False
+ self.task_description = task_description
+
+ def apply_to(self, obj):
+ """Mark the target object as having config applied."""
+ self.applied = True
+ obj.config_applied = True
+ obj.applied = True
+
+ def validate(self):
+ """Record that validation was executed."""
+ self.validated = True
diff --git a/tests/mocks/mock_task.py b/tests/mocks/mock_task.py
index 9aeb46c..7566a76 100644
--- a/tests/mocks/mock_task.py
+++ b/tests/mocks/mock_task.py
@@ -1,5 +1,6 @@
"""Mock task for testing purposes."""
+import math
from unittest.mock import MagicMock
import pandas as pd
@@ -16,35 +17,72 @@ class MockTask(BaseTask):
actual data or model inference.
"""
- def __init__(self, predetermined_scores=None):
- """Initialize the MockTask with optional predetermined scores.
+ def __init__(
+ self,
+ predetermined_scores=None,
+ *,
+ df: pd.DataFrame | None = None,
+ n_subsamples: int = 1,
+ eval_strategy: str = "full",
+ n_blocks: int | None = None,
+ block_idx: int | list[int] = 0,
+ eval_blocks: dict[str, set[int]] | None = None,
+ task_description: str = "Mock classification task",
+ evaluate_fn=None,
+ config=None,
+ ):
+ """Initialize the MockTask with optional overrides for task settings.
Args:
- predetermined_scores: Dictionary mapping prompts to scores,
- or a list of scores to return in sequence, or a function
- that generates scores based on prompts.
+ predetermined_scores: Dict/list/callable for score generation used by _evaluate.
+ config: Optional ExperimentConfig applied to the base class.
+ df: Optional dataframe override to seed the task.
+ n_subsamples: Number of subsamples to expose through BaseTask.
+ eval_strategy: Eval strategy to expose (defaults to "full").
+ n_blocks: Number of blocks to report.
+ block_idx: Current block index (int or list).
+ eval_blocks: Mapping prompt->set of evaluated blocks for selection logic.
+ task_description: Description to attach to the task.
+ evaluate_fn: Optional callable to replace evaluate entirely for tests.
"""
- super().__init__(
- df=pd.DataFrame(
+ base_df = (
+ df
+ if df is not None
+ else pd.DataFrame(
{"x": ["Sample text 1", "Sample text 2", "Sample text 3"], "y": ["positive", "negative", "neutral"]}
- ),
+ )
+ )
+
+ super().__init__(
+ df=base_df,
x_column="x",
y_column="y",
+ eval_strategy=eval_strategy,
+ n_subsamples=n_subsamples,
+ config=config,
)
self.predetermined_scores = predetermined_scores or {}
self.call_history = []
self.score_index = 0
+ self.eval_blocks: dict[str, set[int]] = eval_blocks or {}
- self.x_column = "x"
- self.y_column = "y"
- # Default attributes similar to ClassificationTask
- self.task_description = "Mock classification task"
+ self.task_description = task_description
self.classes = ["positive", "neutral", "negative"]
self.initial_prompts = ["Classify:", "Determine:"]
- self.n_blocks = 10
- self.increment_block_idx = MagicMock()
- self.reset_block_idx = MagicMock()
+ # Allow tests to control block metadata
+ self.n_blocks = n_blocks if n_blocks is not None else max(1, math.ceil(len(self.xs) / self.n_subsamples))
+ self.block_idx = block_idx
+
+ # Track block operations for assertions while keeping original behavior
+ self._reset_block_idx_impl = super().reset_block_idx
+ self.reset_block_idx = MagicMock(side_effect=self._reset_block_idx_impl)
+ self._increment_block_idx_impl = super().increment_block_idx
+ self.increment_block_idx = MagicMock(side_effect=self._increment_block_idx_impl)
+
+ if evaluate_fn is not None:
+ # Replace evaluate for bespoke test logic
+ self.evaluate = evaluate_fn # type: ignore[assignment]
def _evaluate(self, xs: List[str], ys: List[str], preds: List[str], **kwargs) -> List[float]:
"""Calculate the score for a single prediction.
@@ -60,9 +98,20 @@ def _evaluate(self, xs: List[str], ys: List[str], preds: List[str], **kwargs) ->
if isinstance(self.predetermined_scores, dict):
return [self.predetermined_scores.get(pred, 0) for pred in preds]
elif isinstance(self.predetermined_scores, list):
- self.score_index += 1
- return self.predetermined_scores
+ if not self.predetermined_scores:
+ return [0 for _ in preds]
+
+ scores = [
+ self.predetermined_scores[(self.score_index + i) % len(self.predetermined_scores)]
+ for i in range(len(preds))
+ ]
+ self.score_index += len(preds)
+ return scores
elif callable(self.predetermined_scores):
return self.predetermined_scores(xs)
else:
return [len(pred) for pred in preds]
+
+ def get_evaluated_blocks(self, prompts):
+ """Return per-prompt evaluated block sets for testing selection logic."""
+ return {str(p): set(self.prompt_evaluated_blocks.get(str(p), set())) for p in prompts}
diff --git a/tests/optimizers/test_base_optimizer.py b/tests/optimizers/test_base_optimizer.py
new file mode 100644
index 0000000..1b0c2f7
--- /dev/null
+++ b/tests/optimizers/test_base_optimizer.py
@@ -0,0 +1,86 @@
+import pytest
+
+from tests.mocks.dummy_config import DummyConfig
+from tests.mocks.mock_predictor import MockPredictor
+from tests.mocks.mock_task import MockTask
+
+from promptolution.optimizers.base_optimizer import BaseOptimizer
+from promptolution.utils.callbacks import BaseCallback
+
+
+class SimpleOptimizer(BaseOptimizer):
+ def __init__(self, predictor, task, **kwargs):
+ super().__init__(predictor=predictor, task=task, initial_prompts=["p1", "p2"], **kwargs)
+ self.prepared = False
+ self.steps = 0
+
+ def _pre_optimization_loop(self):
+ self.prepared = True
+
+ def _step(self):
+ self.steps += 1
+ return self.prompts
+
+
+class FailingOptimizer(SimpleOptimizer):
+ def _step(self):
+ raise RuntimeError("boom")
+
+ def _on_train_end(self):
+ self.cleaned = True
+ return None
+
+
+class Stopper(BaseCallback):
+ def on_step_end(self, optimizer):
+ # stop after first step to exercise callback stop path
+ return False
+
+ def on_train_end(self, optimizer):
+ optimizer.stopped = True
+ return True
+
+
+@pytest.fixture
+def predictor():
+ return MockPredictor()
+
+
+@pytest.fixture
+def task():
+ return MockTask()
+
+
+def test_base_optimizer_runs_and_calls_callbacks(predictor: MockPredictor, task: MockTask):
+ opt = SimpleOptimizer(predictor=predictor, task=task)
+ opt.callbacks = [Stopper()]
+ opt.optimize(3)
+
+ assert opt.prepared is True
+ assert opt.steps == 1
+ assert getattr(opt, "stopped", False) is True
+
+
+def test_base_optimizer_stops_on_exception(predictor: MockPredictor, task: MockTask):
+ opt = FailingOptimizer(predictor=predictor, task=task)
+ opt.optimize(2)
+
+ assert opt.prepared is True
+ assert getattr(opt, "cleaned", False) is True
+
+
+def test_base_optimizer_no_callbacks_continues(predictor: MockPredictor, task: MockTask):
+ opt = SimpleOptimizer(predictor=predictor, task=task)
+ opt.optimize(2)
+ assert opt.steps == 2
+
+
+def test_base_optimizer_config_validate_and_template(predictor: MockPredictor, task: MockTask):
+ cfg = DummyConfig(task_description="override desc")
+ opt = SimpleOptimizer(predictor=predictor, task=task, config=cfg)
+ opt.optimize(1)
+ assert cfg.validated is True
+
+ templ = opt._initialize_meta_template("Hi ")
+ assert "override desc" in templ
+ assert getattr(opt, "config_applied", True)
diff --git a/tests/optimizers/test_capo.py b/tests/optimizers/test_capo.py
index 305f290..63ce88c 100644
--- a/tests/optimizers/test_capo.py
+++ b/tests/optimizers/test_capo.py
@@ -2,9 +2,8 @@
import pandas as pd
-from tests.mocks.mock_task import MockTask
-
from promptolution.optimizers.capo import CAPO
+from promptolution.utils.capo_utils import build_few_shot_examples, perform_crossover, perform_mutation
from promptolution.utils.prompt import Prompt
from promptolution.utils.templates import CAPO_CROSSOVER_TEMPLATE, CAPO_MUTATION_TEMPLATE
@@ -28,7 +27,7 @@ def test_capo_initialization(mock_meta_llm, mock_predictor, initial_prompts, moc
def test_capo_initialize_population(mock_meta_llm, mock_predictor, initial_prompts, mock_task, mock_df):
- """Test the _initialize_population method."""
+ """Test initializing the population using pre-optimization loop."""
optimizer = CAPO(
predictor=mock_predictor,
task=mock_task,
@@ -37,15 +36,9 @@ def test_capo_initialize_population(mock_meta_llm, mock_predictor, initial_promp
df_few_shots=mock_df,
)
- # Mock the _create_few_shot_examples method to simplify
- def mock_create_few_shot_examples(instruction, num_examples):
- return [f"Example {i}" for i in range(num_examples)]
-
- optimizer._create_few_shot_examples = mock_create_few_shot_examples
-
- # Control randomness
with patch("random.randint", return_value=2):
- population = optimizer._initialize_population([Prompt(p) for p in initial_prompts])
+ optimizer._pre_optimization_loop()
+ population = optimizer.prompts
# Verify population was created
assert len(population) == len(initial_prompts)
@@ -69,17 +62,16 @@ def test_capo_step(mock_meta_llm, mock_predictor, initial_prompts, mock_task, mo
# Mock the internal methods to avoid complexity
mock_offspring = [Prompt("Offspring", ["Example"])]
- optimizer._crossover = lambda x: mock_offspring
-
mock_mutated = [Prompt("Mutated", ["Example"])]
- optimizer._mutate = lambda x: mock_mutated
-
- mock_survivors = [Prompt("Survivor 1", ["Example"]), Prompt("Survivor 2", ["Example"])]
- mock_scores = [0.9, 0.8]
- optimizer._do_racing = lambda x, k: (mock_survivors, mock_scores)
+ with patch("promptolution.optimizers.capo.perform_crossover", return_value=mock_offspring), patch(
+ "promptolution.optimizers.capo.perform_mutation", return_value=mock_mutated
+ ):
+ mock_survivors = [Prompt("Survivor 1", ["Example"]), Prompt("Survivor 2", ["Example"])]
+ mock_scores = [0.9, 0.8]
+ optimizer._do_racing = lambda x, k: (mock_survivors, mock_scores)
- # Call _step
- result = optimizer._step()
+ # Call _step
+ result = optimizer._step()
# Verify results
assert len(result) == 2 # Should match population_size
@@ -117,7 +109,7 @@ def mock_step():
def test_create_few_shots(mock_meta_llm, mock_predictor, initial_prompts, mock_task, mock_df):
- """Test the _create_few_shot_examples method."""
+ """Test the few-shot example builder."""
optimizer = CAPO(
predictor=mock_predictor,
task=mock_task,
@@ -126,14 +118,21 @@ def test_create_few_shots(mock_meta_llm, mock_predictor, initial_prompts, mock_t
df_few_shots=mock_df,
)
- # Call the method
- few_shot_examples = optimizer._create_few_shot_examples("Classify the sentiment of the text.", 2)
+ few_shot_examples = build_few_shot_examples(
+ instruction="Classify the sentiment of the text.",
+ num_examples=2,
+ optimizer=optimizer,
+ )
# Verify results
assert len(few_shot_examples) == 2
assert all(isinstance(example, str) for example in few_shot_examples)
- few_shot_examples = optimizer._create_few_shot_examples("Classify the sentiment of the text.", 0)
+ few_shot_examples = build_few_shot_examples(
+ instruction="Classify the sentiment of the text.",
+ num_examples=0,
+ optimizer=optimizer,
+ )
assert len(few_shot_examples) == 0
@@ -148,7 +147,10 @@ def test_crossover(mock_meta_llm, mock_predictor, initial_prompts, mock_task, mo
crossovers_per_iter=5,
)
- offsprings = optimizer._crossover([Prompt("Instruction 1", ["Example 1"]), Prompt("Instruction 2", ["Example 2"])])
+ offsprings = perform_crossover(
+ [Prompt("Instruction 1", ["Example 1"]), Prompt("Instruction 2", ["Example 2"])],
+ optimizer=optimizer,
+ )
assert len(offsprings) == 5
@@ -161,30 +163,11 @@ def test_mutate(mock_meta_llm, mock_predictor, initial_prompts, mock_task, mock_
df_few_shots=mock_df,
)
- mutated = optimizer._mutate([Prompt("Instruction 1", ["Example 1"]), Prompt("Instruction 2", ["Example 2"])])
- assert len(mutated) == 2
-
-
-def test_do_racing(mock_meta_llm, mock_predictor, initial_prompts, mock_df):
- mock_task = MockTask(predetermined_scores=[0.89, 0.9] * 3)
- optimizer = CAPO(
- predictor=mock_predictor,
- task=mock_task,
- meta_llm=mock_meta_llm,
- initial_prompts=initial_prompts,
- df_few_shots=pd.concat([mock_df] * 5, ignore_index=True),
- )
- optimizer._pre_optimization_loop()
- survivors, scores = optimizer._do_racing(
- [Prompt("good instruction", ["Example 1"]), Prompt("better instruction", ["Example 2"])], 1
+ mutated = perform_mutation(
+ offsprings=[Prompt("Instruction 1", ["Example 1"]), Prompt("Instruction 2", ["Example 2"])],
+ optimizer=optimizer,
)
- assert len(survivors) == 1
- assert len(scores) == 1
-
- assert "better instruction" in survivors[0].instruction
-
- assert mock_task.reset_block_idx.call_count == 2
- assert mock_task.increment_block_idx.call_count == 3
+ assert len(mutated) == 2
def test_capo_crossover_prompt(mock_meta_llm, mock_predictor, initial_prompts, mock_task, mock_df):
@@ -199,7 +182,7 @@ def test_capo_crossover_prompt(mock_meta_llm, mock_predictor, initial_prompts, m
mother = Prompt("Classify the sentiment of the text.", ["Input: I love this! Output: Positive"])
father = Prompt("Determine if the review is positive or negative.", ["Input: This is terrible. Output: Negative"])
- optimizer._crossover([mother, father])
+ perform_crossover([mother, father], optimizer=optimizer)
full_task_desc = mock_task.task_description + "\n" + optimizer.predictor.extraction_description
@@ -208,8 +191,13 @@ def test_capo_crossover_prompt(mock_meta_llm, mock_predictor, initial_prompts, m
.replace("", father.instruction)
.replace("", full_task_desc)
)
+ alt_meta_prompt = (
+ CAPO_CROSSOVER_TEMPLATE.replace("", father.instruction)
+ .replace("", mother.instruction)
+ .replace("", full_task_desc)
+ )
- assert str(mock_meta_llm.call_history[0]["prompts"][0]) == expected_meta_prompt
+ assert str(mock_meta_llm.call_history[0]["prompts"][0]) in {expected_meta_prompt, alt_meta_prompt}
def test_capo_mutate_prompt(mock_meta_llm, mock_predictor, initial_prompts, mock_task, mock_df):
@@ -224,7 +212,10 @@ def test_capo_mutate_prompt(mock_meta_llm, mock_predictor, initial_prompts, mock
full_task_desc = mock_task.task_description + "\n" + optimizer.predictor.extraction_description
parent = Prompt("Classify the sentiment of the text.", ["Input: I love this! Output: Positive"])
- optimizer._mutate([parent])
+ perform_mutation(
+ offsprings=[parent],
+ optimizer=optimizer,
+ )
expected_meta_prompt = CAPO_MUTATION_TEMPLATE.replace("", parent.instruction).replace(
"", full_task_desc
diff --git a/tests/predictors/test_base_predictor.py b/tests/predictors/test_base_predictor.py
index 4bfeacd..1d65718 100644
--- a/tests/predictors/test_base_predictor.py
+++ b/tests/predictors/test_base_predictor.py
@@ -1,5 +1,9 @@
import numpy as np
+from tests.mocks.dummy_config import DummyConfig
+from tests.mocks.mock_llm import MockLLM
+from tests.mocks.mock_predictor import MockPredictor
+
def test_predictor_predict_flow(mock_predictor):
"""Test the basic prediction flow from prompt to final prediction."""
@@ -8,7 +12,7 @@ def test_predictor_predict_flow(mock_predictor):
prompts = ["Classify this text:"]
# Call predict
- predictions = mock_predictor.predict(prompts, xs)
+ predictions, _ = mock_predictor.predict(prompts, xs)
# Verify shape and content of predictions
assert predictions.shape == (1,)
assert predictions[0] == "neutral"
@@ -27,7 +31,7 @@ def test_predictor_with_return_seq(mock_predictor):
xs = np.array(["This product is okay."])
# Call predict with return_seq=True
- predictions, sequences = mock_predictor.predict(prompts, xs, return_seq=True)
+ predictions, sequences = mock_predictor.predict(prompts, xs)
# Verify predictions
assert predictions.shape == (1,)
@@ -37,3 +41,23 @@ def test_predictor_with_return_seq(mock_predictor):
assert len(sequences) == 1
assert isinstance(sequences, list)
assert "This product is okay." in sequences[0]
+
+
+def test_predictor_accepts_string_prompt(mock_predictor):
+ preds, seqs = mock_predictor.predict("solo", ["input"], system_prompts="sys")
+ assert preds.shape[0] == 1
+ assert seqs[0].startswith("input\n")
+
+
+def test_predictor_system_prompt_string_converted(mock_predictor):
+ preds, seqs = mock_predictor.predict(["p1", "p2"], ["x1", "x2"], system_prompts="sys")
+ assert len(preds) == 2
+ # call_history should show system_prompts broadcasted
+ assert mock_predictor.llm.call_history[-1]["system_prompts"] == ["sys", "sys"]
+
+
+def test_predictor_applies_config():
+ cfg = DummyConfig()
+ predictor = MockPredictor(llm=MockLLM(), config=cfg)
+ assert cfg.applied is True
+ assert getattr(predictor, "config_applied") is True
diff --git a/tests/predictors/test_predictors.py b/tests/predictors/test_predictors.py
index 2f7e11f..9fa5658 100644
--- a/tests/predictors/test_predictors.py
+++ b/tests/predictors/test_predictors.py
@@ -14,7 +14,7 @@ def test_first_occurrence_classifier(mock_downstream_llm, mock_df):
prompts = ["Classify:"] * len(xs)
# Make predictions
- predictions = classifier.predict(prompts, xs)
+ predictions, _ = classifier.predict(prompts, xs)
# Verify shape and content
assert len(predictions) == 4
@@ -39,7 +39,7 @@ def test_marker_based_classifier(mock_downstream_llm, mock_df):
prompts = ["Classify:"] * len(xs)
# Make predictions
- predictions = classifier.predict(prompts, xs)
+ predictions, _ = classifier.predict(prompts, xs)
# Verify shape and content
assert len(predictions) == 3
@@ -49,7 +49,7 @@ def test_marker_based_classifier(mock_downstream_llm, mock_df):
# Test with invalid class label
invalid_input = np.array(["Broken item"] * len(prompts))
- invalid_predictions = classifier.predict(prompts, invalid_input)
+ invalid_predictions, _ = classifier.predict(prompts, invalid_input)
# Should default to first class if invalid
assert invalid_predictions[0] == "positive"
@@ -70,7 +70,7 @@ def test_marker_based_without_classes(mock_downstream_llm):
prompts = ["Classify:"] * len(xs)
# Make predictions
- predictions = predictor.predict(prompts, xs)
+ predictions, _ = predictor.predict(prompts, xs)
# Verify shape and content - should accept any value between markers
assert len(predictions) == 4
@@ -90,7 +90,7 @@ def test_multiple_prompts_with_classifiers(mock_downstream_llm, mock_df):
xs = np.array(["I love this product!", "I hate this product!"] * 2)
# Make predictions
- predictions = classifier.predict(prompts, xs)
+ predictions, _ = classifier.predict(prompts, xs)
# Verify shape and content
assert len(predictions) == 4
@@ -110,7 +110,7 @@ def test_sequence_return_with_classifiers(mock_downstream_llm, mock_df):
xs = np.array(["I love this product!"])
# Make predictions with sequences
- predictions, sequences = classifier.predict(prompts, xs, return_seq=True)
+ predictions, sequences = classifier.predict(prompts, xs)
# Verify predictions
assert len(predictions) == 1
@@ -140,7 +140,7 @@ def test_marker_based_missing_markers(mock_downstream_llm):
# When markers are missing, it should default to first class
prompts = ["Classify:"]
- xs = np.array(["Missing markers"])
- predictions = classifier.predict(prompts, xs)
+ xs = ["Missing markers"]
+ preds, seqs = classifier.predict(prompts, xs)
- assert predictions[0] == "will" # Should default to first class
+ assert preds[0] == "will" # Should default to first class
diff --git a/tests/tasks/test_base_task.py b/tests/tasks/test_base_task.py
new file mode 100644
index 0000000..4ed7529
--- /dev/null
+++ b/tests/tasks/test_base_task.py
@@ -0,0 +1,142 @@
+import pandas as pd
+import pytest
+
+from tests.mocks.dummy_config import DummyConfig
+from tests.mocks.mock_llm import MockLLM
+from tests.mocks.mock_predictor import MockPredictor
+from tests.mocks.mock_task import MockTask
+
+from promptolution.utils.prompt import Prompt
+
+
+@pytest.fixture
+def predictor():
+ return MockPredictor(llm=MockLLM())
+
+
+@pytest.fixture
+def small_task():
+ df = pd.DataFrame({"x": ["a", "b", "c"], "y": ["1", "0", "1"]})
+ return MockTask(df=df, eval_strategy="sequential_block", n_subsamples=1)
+
+
+@pytest.fixture
+def cost_task():
+ df = pd.DataFrame({"x": ["m", "n", "o"], "y": ["1", "0", "1"]})
+ return MockTask(df=df, eval_strategy="full", n_subsamples=3)
+
+
+def test_subsample_and_block_controls(small_task):
+ task = small_task
+
+ xs, ys = task.subsample()
+ assert len(xs) == 1
+
+ task.increment_block_idx()
+ assert task.block_idx == 1 % task.n_blocks if task.n_blocks else 0
+
+ xs2, _ = task.subsample(block_idx=[0, 1, 2])
+ assert set(xs2) == set(task.xs)
+
+ popped = task.pop_datapoints(n=1)
+ assert len(popped) == 1
+ assert len(task.xs) == 2
+
+ task.reset_block_idx()
+ assert task.block_idx == 0
+
+ task.eval_strategy = "full"
+ with pytest.raises(ValueError):
+ task.increment_block_idx()
+ with pytest.raises(ValueError):
+ task.reset_block_idx()
+
+
+def test_prepare_batch_and_evaluated_strategy(small_task):
+ task = small_task
+ prompts = [Prompt("p1"), Prompt("p2")]
+ xs, ys = task.subsample()
+
+ to_eval = task._prepare_batch(prompts, xs, ys, eval_strategy="evaluated")
+ assert to_eval == ([], [], [], [])
+
+ normal = task._prepare_batch(prompts, xs, ys)
+ assert len(normal[0]) == len(prompts) * len(xs)
+
+
+def test_pop_datapoints_clears_cache_and_frac(small_task):
+ task = small_task
+ p = Prompt("p")
+ key = (str(p), task.xs[0], task.ys[0])
+ task.eval_cache[key] = 0.5
+ task.seq_cache[key] = "seq"
+
+ popped = task.pop_datapoints(frac=0.5)
+ assert len(popped) > 0
+ assert not task.eval_cache
+ assert not task.seq_cache
+
+
+def test_unknown_strategy_raises(small_task):
+ task = small_task
+ task.eval_strategy = "unknown"
+ with pytest.raises(ValueError):
+ task.subsample()
+
+
+def test_set_block_idx_validation(small_task):
+ task = small_task
+ with pytest.raises(AssertionError):
+ task.set_block_idx("bad") # type: ignore
+
+
+def test_pop_datapoints_requires_arg(small_task):
+ task = small_task
+ with pytest.raises(AssertionError):
+ task.pop_datapoints(n=1, frac=0.1)
+
+
+def test_get_evaluated_blocks_mapping(small_task):
+ task = small_task
+ prompt = Prompt("p")
+ task.prompt_evaluated_blocks[str(prompt)] = {0, 1}
+ mapping = task.get_evaluated_blocks([prompt])
+ assert mapping[str(prompt)] == {0, 1}
+
+
+def test_compute_costs_shapes(predictor, cost_task):
+ task = cost_task
+ prompts = [Prompt("inst"), Prompt("inst2")]
+ result = task.evaluate(prompts, predictor)
+
+ assert result.input_tokens.shape[0] == len(prompts)
+ assert result.output_tokens.shape[0] == len(prompts)
+
+
+def test_evaluate_with_block_list_updates_blocks(predictor, small_task):
+ task = small_task
+ task.block_idx = [0, 1]
+ prompts = [Prompt("p1"), Prompt("p2")]
+ task.evaluate(prompts, predictor)
+ for p in prompts:
+ assert task.prompt_evaluated_blocks[p] == [0, 1]
+
+
+def test_task_config_applied():
+ cfg = DummyConfig()
+ df = pd.DataFrame({"x": ["a", "b", "c"], "y": ["1", "0", "1"]})
+ task = MockTask(df=df, eval_strategy="sequential_block", n_subsamples=1, config=cfg)
+ assert cfg.applied is True
+ assert hasattr(task, "config_applied")
+
+
+def test_block_wraparound_and_get_cache_keys():
+ df = pd.DataFrame({"x": ["a", "b"], "y": ["1", "0"]})
+ task = MockTask(df=df, eval_strategy="sequential_block", n_subsamples=1)
+ task.block_idx = task.n_blocks - 1
+ task.increment_block_idx()
+ assert task.block_idx == 0
+
+ prompt = Prompt("hi")
+ key = task._cache_key(prompt, "x", "y")
+ assert key[0].startswith(prompt.instruction)
diff --git a/tests/tasks/test_classifications_tasks.py b/tests/tasks/test_classifications_tasks.py
index 256a63d..642d7c9 100644
--- a/tests/tasks/test_classifications_tasks.py
+++ b/tests/tasks/test_classifications_tasks.py
@@ -21,44 +21,48 @@ def test_classification_task_initialization(mock_df):
def test_task_evaluate(mock_classification_task_with_subsampling, mock_predictor):
"""Test the evaluate method of ClassificationTask."""
prompts = [Prompt("Classify sentiment:")]
- scores = mock_classification_task_with_subsampling.evaluate(prompts, mock_predictor)
+ result = mock_classification_task_with_subsampling.evaluate(prompts, mock_predictor)
+ scores = result.agg_scores
- assert isinstance(scores, list)
- assert len(scores) == 1
+ assert scores.shape == (1,)
assert 0 <= scores[0] <= 1
prompts = ["Classify sentiment:", "Rate the text:"]
prompts = [Prompt(p) for p in prompts]
- scores = mock_classification_task_with_subsampling.evaluate(prompts, mock_predictor)
+ result = mock_classification_task_with_subsampling.evaluate(prompts, mock_predictor)
+ scores = result.agg_scores
- assert len(scores) == 2
- assert all(0 <= score <= 1 for score in scores)
+ assert scores.shape == (2,)
+ assert np.all((scores >= 0) & (scores <= 1))
def test_task_evaluate_with_subsampling(mock_classification_task_with_subsampling, mock_predictor):
"""Test the evaluate method with subsampling."""
prompts = [Prompt("Classify sentiment:")]
- scores = mock_classification_task_with_subsampling.evaluate(
+ scores_result = mock_classification_task_with_subsampling.evaluate(
prompts,
mock_predictor,
)
+ scores = scores_result.agg_scores
- assert len(scores) == 1
+ assert scores.shape == (1,)
with pytest.raises(AssertionError, match=r".*Arrays are not equal.*"):
np.random.seed(42)
- scores1 = mock_classification_task_with_subsampling.evaluate(
+ scores1_result = mock_classification_task_with_subsampling.evaluate(
prompts,
mock_predictor,
)
+ scores1 = scores1_result.scores
np.random.seed(43)
- scores2 = mock_classification_task_with_subsampling.evaluate(
+ scores2_result = mock_classification_task_with_subsampling.evaluate(
prompts,
mock_predictor,
)
+ scores2 = scores2_result.scores
np.testing.assert_array_equal(scores1, scores2)
@@ -67,14 +71,13 @@ def test_task_evaluate_with_return_seq(mock_classification_task_with_subsampling
"""Test the evaluate method with return_seq=True."""
prompts = [Prompt("Classify sentiment:")]
- scores, seqs = mock_classification_task_with_subsampling.evaluate(
- prompts, mock_predictor, return_seq=True, return_agg_scores=False
- )
+ seq_result = mock_classification_task_with_subsampling.evaluate(prompts, mock_predictor)
- assert len(scores) == 1
- assert len(scores[0]) == mock_classification_task_with_subsampling.n_subsamples
- assert len(seqs) == 1
- assert len(seqs[0]) == mock_classification_task_with_subsampling.n_subsamples
+ assert seq_result.scores.shape == (1, mock_classification_task_with_subsampling.n_subsamples)
+ assert seq_result.sequences is not None
+ assert len(seq_result.sequences) == 1
+ assert len(seq_result.sequences[0]) == mock_classification_task_with_subsampling.n_subsamples
+ assert seq_result.agg_input_tokens is not None
def test_task_evaluate_with_system_prompts(
@@ -85,11 +88,9 @@ def test_task_evaluate_with_system_prompts(
prompts = [Prompt("Classify sentiment:")]
system_prompts = ["Be concise"]
- scores = mock_classification_task_with_subsampling.evaluate(
- prompts, mock_predictor, system_prompts=system_prompts, return_agg_scores=True
- )
+ result = mock_classification_task_with_subsampling.evaluate(prompts, mock_predictor, system_prompts=system_prompts)
- assert len(scores) == 1
+ assert result.agg_scores.shape == (1,)
assert any(call["system_prompts"] == system_prompts for call in mock_downstream_llm.call_history)
@@ -97,7 +98,7 @@ def test_pop_datapoints(mock_df):
task = ClassificationTask(
df=mock_df,
task_description="Sentiment classification task",
- eval_strategy="sequential_blocks",
+ eval_strategy="sequential_block",
)
df = task.pop_datapoints(n=1)
@@ -108,7 +109,7 @@ def test_pop_datapoints(mock_df):
def test_blocks(mock_df):
task = ClassificationTask(
- df=mock_df, task_description="Sentiment classification task", eval_strategy="sequential_blocks", n_subsamples=1
+ df=mock_df, task_description="Sentiment classification task", eval_strategy="sequential_block", n_subsamples=1
)
task.increment_block_idx()
diff --git a/tests/tasks/test_judge_task.py b/tests/tasks/test_judge_task.py
index 3cf0066..15a6032 100644
--- a/tests/tasks/test_judge_task.py
+++ b/tests/tasks/test_judge_task.py
@@ -57,17 +57,18 @@ def test_judge_task_evaluate_with_ground_truth(mock_judge_task_with_y, mock_pred
mock_predictor.call_history = []
mock_judge_llm.call_history = []
- scores_per_datapoint = mock_judge_task_with_y.evaluate(prompts, mock_predictor, return_agg_scores=False)
+ result = mock_judge_task_with_y.evaluate(prompts, mock_predictor)
+ scores_per_datapoint = result.scores
- assert len(scores_per_datapoint) == len(prompts)
+ assert scores_per_datapoint.shape[0] == len(prompts)
expected_scores = [1.0, 0, 0.5]
np.testing.assert_allclose(scores_per_datapoint[0], expected_scores)
mock_predictor.call_history = []
mock_judge_llm.call_history = []
- aggregated_scores = mock_judge_task_with_y.evaluate(prompts, mock_predictor, return_agg_scores=True)
- assert len(aggregated_scores) == len(prompts)
+ aggregated_scores = result.agg_scores
+ assert aggregated_scores.shape[0] == len(prompts)
expected_scores = [0.5, 0.4333333, 0.0]
np.testing.assert_allclose(aggregated_scores, expected_scores)
@@ -80,9 +81,10 @@ def test_judge_task_evaluate_no_ground_truth(mock_judge_task_no_y, mock_predicto
mock_predictor.call_history = []
mock_judge_llm.call_history = []
- aggregated_scores = mock_judge_task_no_y.evaluate(prompts, mock_predictor, return_agg_scores=True)
+ aggregated_result = mock_judge_task_no_y.evaluate(prompts, mock_predictor)
+ aggregated_scores = aggregated_result.agg_scores
- assert len(aggregated_scores) == len(prompts)
+ assert aggregated_scores.shape[0] == len(prompts)
expected_scores = [0.5, 0.55, 0.35]
np.testing.assert_allclose(aggregated_scores, expected_scores)
@@ -92,9 +94,9 @@ def test_judge_task_evaluate_with_return_seq(mock_judge_task_with_y, mock_predic
prompts = ["Evaluate this text:", "What is the sentiment?", "How would you classify this?"]
prompts = [Prompt(p) for p in prompts]
- scores, seqs = mock_judge_task_with_y.evaluate(prompts, mock_predictor, return_seq=True, return_agg_scores=False)
+ seq_result = mock_judge_task_with_y.evaluate(prompts, mock_predictor)
- assert len(scores) == 3
- assert len(scores[0]) == len(mock_judge_task_with_y.xs)
- assert len(seqs) == 3
- assert len(seqs[0]) == len(mock_judge_task_with_y.xs)
+ assert seq_result.scores.shape == (3, len(mock_judge_task_with_y.xs))
+ assert seq_result.sequences is not None
+ assert seq_result.sequences.shape == (3, len(mock_judge_task_with_y.xs))
+ assert seq_result.agg_input_tokens is not None
diff --git a/tests/tasks/test_multi_objective_task.py b/tests/tasks/test_multi_objective_task.py
new file mode 100644
index 0000000..cfb4caa
--- /dev/null
+++ b/tests/tasks/test_multi_objective_task.py
@@ -0,0 +1,127 @@
+import numpy as np
+import pandas as pd
+import pytest
+
+from tests.mocks.mock_llm import MockLLM
+from tests.mocks.mock_predictor import MockPredictor
+from tests.mocks.mock_task import MockTask
+
+from promptolution.optimizers.base_optimizer import BaseOptimizer
+from promptolution.tasks.base_task import BaseTask, EvalResult
+from promptolution.tasks.multi_objective_task import MultiObjectiveTask
+from promptolution.utils.prompt import Prompt
+
+
+def test_multi_objective_single_prediction_flow():
+ task1 = MockTask()
+ task2 = MockTask()
+ predictor = MockPredictor(llm=MockLLM())
+
+ prompt = Prompt("classify")
+ result = MultiObjectiveTask([task1, task2]).evaluate([prompt], predictor=predictor)
+
+ assert len(result.agg_scores) == 2
+ assert result.agg_scores[0].shape == (1,)
+ assert result.sequences.shape[0] == 1
+ assert MultiObjectiveTask([task1, task2]).tasks[0].n_subsamples == task1.n_subsamples
+
+
+def test_multi_objective_shares_block_and_caches():
+ df = pd.DataFrame({"x": ["u", "v"], "y": ["1", "0"]})
+ t1 = MockTask(df=df, eval_strategy="sequential_block", n_subsamples=1, n_blocks=len(df), block_idx=0)
+ t2 = MockTask(df=df, eval_strategy="sequential_block", n_subsamples=1, n_blocks=len(df), block_idx=0)
+
+ predictor = MockPredictor(llm=MockLLM())
+ prompt = Prompt("judge")
+
+ multi = MultiObjectiveTask([t1, t2])
+ multi.block_idx = 1
+ res = multi.evaluate(prompt, predictor=predictor)
+
+ assert len(t1.eval_cache) == len(t2.eval_cache)
+ assert res.input_tokens.shape[0] == 1
+ assert multi.prompt_evaluated_blocks[prompt] == [1]
+
+
+def test_multi_objective_requires_tasks():
+ with pytest.raises(ValueError):
+ MultiObjectiveTask([])
+
+
+def test_multi_objective_matches_individual_results():
+ df = pd.DataFrame({"x": ["u", "v"], "y": ["1", "0"]})
+
+ def make_task():
+ return MockTask(df=df, eval_strategy="sequential_block", n_subsamples=1, n_blocks=len(df), block_idx=0)
+
+ t1 = make_task()
+ t2 = make_task()
+ predictor = MockPredictor(llm=MockLLM())
+
+ prompt = Prompt("judge")
+ multi = MultiObjectiveTask([t1, t2])
+ multi.block_idx = 1
+ multi_res = multi.evaluate([prompt], predictor=predictor)
+
+ # Fresh tasks/predictor to mirror a single-task call
+ s1 = make_task()
+ s2 = make_task()
+ single_pred = MockPredictor(llm=MockLLM())
+ res1 = s1.evaluate([prompt], predictor=single_pred)
+ res2 = s2.evaluate([prompt], predictor=single_pred)
+
+ assert np.allclose(multi_res.agg_scores[0], res1.agg_scores)
+ assert np.allclose(multi_res.agg_scores[1], res2.agg_scores)
+ assert multi_res.sequences.shape == res1.sequences.shape
+ assert multi.prompt_evaluated_blocks[prompt] == [1]
+
+
+class ConstantTask(BaseTask):
+ """Simple task that returns a constant score for all predictions."""
+
+ def __init__(self, df: pd.DataFrame, value: float) -> None:
+ self._value = value
+ super().__init__(
+ df=df,
+ x_column="x",
+ y_column=None,
+ n_subsamples=len(df),
+ eval_strategy="full",
+ seed=0,
+ task_description="constant",
+ config=None,
+ )
+
+ def _evaluate(self, xs, ys, preds):
+ return np.full(len(preds), self._value, dtype=float)
+
+
+class DummyOptimizer(BaseOptimizer):
+ """Non-multi-objective optimizer used to trigger fallback logic."""
+
+ def _pre_optimization_loop(self) -> None:
+ pass
+
+ def _step(self):
+ return self.prompts
+
+
+def test_multi_objective_fallback_warns_and_averages(caplog):
+ df = pd.DataFrame({"x": ["a", "b"]})
+ t1 = ConstantTask(df.copy(), value=1.0)
+ t2 = ConstantTask(df.copy(), value=3.0)
+ mo_task = MultiObjectiveTask([t1, t2])
+
+ predictor = MockPredictor(llm=MockLLM(predetermined_responses=["p1", "p2"]))
+
+ with caplog.at_level("WARNING"):
+ DummyOptimizer(predictor=predictor, task=mo_task)
+
+ assert mo_task._scalarized_objective is True
+ assert any("averaged equally" in message for message in caplog.messages)
+
+ result = mo_task.evaluate(prompts=[Prompt("p")], predictor=predictor)
+
+ assert isinstance(result, EvalResult)
+ assert np.allclose(result.scores, np.array([[2.0, 2.0]]))
+ assert np.allclose(result.agg_scores, np.array([2.0]))
diff --git a/tests/tasks/test_reward_tasks.py b/tests/tasks/test_reward_tasks.py
index 76e3545..eb37ab7 100644
--- a/tests/tasks/test_reward_tasks.py
+++ b/tests/tasks/test_reward_tasks.py
@@ -1,3 +1,6 @@
+import pandas as pd
+
+from promptolution.tasks.reward_tasks import RewardTask
from promptolution.utils.prompt import Prompt
@@ -24,7 +27,29 @@ def test_reward_task_evaluate_with_return_seq(mock_reward_task, mock_predictor):
"""Test the evaluate method with return_seq=True for RewardTask."""
prompts = [Prompt("Generate a short text:")]
- scores, seqs = mock_reward_task.evaluate(prompts, mock_predictor, return_seq=True, return_agg_scores=False)
+ result = mock_reward_task.evaluate(prompts, mock_predictor)
+
+ assert result.scores.shape[0] == 1
+ assert result.sequences is not None
+ assert result.sequences.shape[0] == 1
+ assert result.agg_input_tokens is not None
+
+
+def test_reward_task_passes_reward_columns():
+ """Ensure reward kwargs come from dataframe columns."""
+ df = pd.DataFrame({"x": ["a", "b", "c"], "reward": [0.1, 0.2, 0.3]})
+
+ seen_rewards: list[float] = []
+
+ def reward_fn(prediction: str, reward: float) -> float:
+ seen_rewards.append(reward)
+ return reward if prediction == "keep" else -1.0
+
+ task = RewardTask(df=df, reward_function=reward_fn, x_column="x", reward_columns=["reward"])
+
+ xs = ["a", "b", "c"]
+ preds = ["keep", "keep", "nope"]
+ scores = task._evaluate(xs, [""] * len(xs), preds)
- assert len(scores) == 1
- assert len(seqs) == 1
+ assert scores.tolist() == [0.1, 0.2, -1.0]
+ assert seen_rewards == [0.1, 0.2, 0.3]
diff --git a/tests/utils/test_prompt.py b/tests/utils/test_prompt.py
index 3dc90bb..9ee3c24 100644
--- a/tests/utils/test_prompt.py
+++ b/tests/utils/test_prompt.py
@@ -1,3 +1,5 @@
+import numpy as np
+
from promptolution.utils.prompt import Prompt, sort_prompts_by_scores
@@ -39,3 +41,14 @@ def test_sort_prompts_by_scores():
# Verify sorting
assert sorted_prompts == [prompt2, prompt1, prompt3]
assert sorted_scores == [0.90, 0.75, 0.60]
+
+
+def test_sort_prompts_by_scores_with_array():
+ """Ensure sorting works when scores are numpy arrays (aggregated via mean)."""
+ prompts = [Prompt("p1"), Prompt("p2"), Prompt("p3")]
+ scores = np.array([[0.5, 0.7], [0.8, 0.9], [0.4, 0.6]])
+
+ sorted_prompts, sorted_scores = sort_prompts_by_scores(prompts, scores)
+
+ assert sorted_prompts == [prompts[1], prompts[0], prompts[2]]
+ np.testing.assert_allclose(sorted_scores, [0.85, 0.6, 0.5])
diff --git a/tests/utils/test_prompt_creation.py b/tests/utils/test_prompt_creation.py
index 1c8c950..faefd4a 100644
--- a/tests/utils/test_prompt_creation.py
+++ b/tests/utils/test_prompt_creation.py
@@ -1,3 +1,5 @@
+import numpy as np
+
from promptolution.tasks.base_task import BaseTask
from promptolution.tasks.classification_tasks import ClassificationTask
from promptolution.utils.prompt_creation import create_prompt_variation, create_prompts_from_samples
@@ -143,3 +145,20 @@ def test_create_prompts_from_samples_multiple_prompts(mock_df, mock_meta_llm):
assert len(generated_prompts) == n_prompts
assert len(mock_meta_llm.call_history) == 1
+
+
+def test_create_prompts_from_samples_uniform_labels(mock_df, mock_meta_llm):
+ """Ensure uniform-label sampling includes every class in the meta-prompt examples."""
+ task = ClassificationTask(df=mock_df, x_column="x", y_column="y")
+ task.xs = np.asarray(task.xs)
+ task.ys = np.asarray(task.ys)
+
+ mock_meta_llm.reset()
+
+ prompts = create_prompts_from_samples(task, mock_meta_llm, n_samples=2, n_prompts=1, get_uniform_labels=True)
+
+ assert len(prompts) == 1
+ # The constructed meta-prompt should include at least one example per label
+ sent_prompt = mock_meta_llm.call_history[0]["prompts"][0]
+ for label in ["positive", "negative", "neutral"]:
+ assert f"Output: {label}" in sent_prompt