Skip to content
184 changes: 101 additions & 83 deletions src/models/deprecation.py
Original file line number Diff line number Diff line change
@@ -1,135 +1,153 @@
"""Data models for deprecation entries."""
"""Deprecation model for AI model deprecation tracking."""

import hashlib
from datetime import UTC, datetime
from typing import Any

from pydantic import BaseModel, Field, HttpUrl, field_validator, model_validator
from pydantic import BaseModel, Field, field_validator, model_validator


class Deprecation(BaseModel):
"""Model for AI model deprecation information."""
class DeprecationEntry(BaseModel):
"""Model representing an AI model deprecation entry."""

provider: str = Field(description="Provider name (e.g., 'OpenAI', 'Anthropic')")
model: str = Field(description="Affected model name")
deprecation_date: datetime = Field(description="When the deprecation was announced")
retirement_date: datetime = Field(description="When the model stops working")
replacement: str | None = Field(default=None, description="Suggested alternative model")
notes: str | None = Field(default=None, description="Additional context")
source_url: HttpUrl = Field(description="URL where the deprecation info came from")
provider: str = Field(..., description="Provider name (e.g., OpenAI, Anthropic)")
model: str = Field(..., description="Model name or identifier")
deprecation_date: datetime = Field(..., description="Date when deprecation was announced")
retirement_date: datetime = Field(..., description="Date when model stops working")
replacement: str | None = Field(None, description="Suggested alternative model")
notes: str | None = Field(None, description="Additional context or information")
source_url: str = Field(..., description="Link to official announcement")
last_updated: datetime = Field(
default_factory=lambda: datetime.now(UTC),
description="When we last checked this information",
)
# Alias for compatibility with main branch
created_at: datetime = Field(
default_factory=lambda: datetime.now(UTC),
description="When entry was created (alias for last_updated)",
description="When this entry was last updated",
)

@field_validator("deprecation_date", "retirement_date", "last_updated")
@field_validator("provider", "model")
@classmethod
def validate_non_empty_string(cls, v: str) -> str:
"""Ensure provider and model are non-empty strings."""
if not v or not v.strip():
raise ValueError("Field must be a non-empty string")
return v.strip()

@field_validator("source_url")
@classmethod
def ensure_utc_timezone(cls, v: datetime) -> datetime:
"""Ensure datetime fields have UTC timezone."""
if v.tzinfo is None:
return v.replace(tzinfo=UTC)
return v.astimezone(UTC)
def validate_url(cls, v: str) -> str:
"""Basic URL validation."""
if not v.startswith(("http://", "https://")):
raise ValueError("URL must start with http:// or https://")
return v

@model_validator(mode="after")
def validate_dates(self) -> "Deprecation":
"""Validate that retirement_date is after deprecation_date."""
def validate_dates(self) -> "DeprecationEntry":
"""Ensure retirement date is after deprecation date."""
if self.retirement_date <= self.deprecation_date:
raise ValueError("retirement_date must be after deprecation_date")
raise ValueError("Retirement date must be after deprecation date")
return self

def to_rss_item(self) -> dict[str, str | datetime]:
"""Convert to RSS item dictionary."""
description_parts = [
f"Provider: {self.provider}",
f"Model: {self.model}",
f"Deprecation Date: {self.deprecation_date.isoformat()}",
f"Retirement Date: {self.retirement_date.isoformat()}",
]

if self.replacement:
description_parts.append(f"Replacement: {self.replacement}")

if self.notes:
description_parts.append(f"Notes: {self.notes}")

description = "\n".join(description_parts)

title = f"{self.provider} - {self.model} Deprecation"

return {
"title": title,
"description": description,
"link": self.source_url,
"guid": f"{self.provider}-{self.model}-{self.deprecation_date.isoformat()}",
"pubDate": self.deprecation_date,
}

def to_json_dict(self) -> dict[str, str | None]:
"""Convert to JSON-serializable dictionary."""
return {
"provider": self.provider,
"model": self.model,
"deprecation_date": self.deprecation_date.isoformat(),
"retirement_date": self.retirement_date.isoformat(),
"replacement": self.replacement,
"notes": self.notes,
"source_url": self.source_url,
"last_updated": self.last_updated.isoformat(),
}

model_config = {
"json_encoders": {
datetime: lambda v: v.isoformat(),
}
}

# Compatibility methods for main branch
def is_active(self) -> bool:
"""Check if deprecation is still active (not yet retired)."""
now = datetime.now(UTC)
return self.retirement_date > now

def get_hash(self) -> str:
"""Generate hash of core deprecation data (excluding last_updated)."""
# Include all fields that identify the unique deprecation, excluding last_updated
"""Generate hash for core deprecation data."""
import hashlib

core_data = {
"provider": self.provider,
"model": self.model,
"deprecation_date": self.deprecation_date.isoformat(),
"retirement_date": self.retirement_date.isoformat(),
"replacement": self.replacement,
"notes": self.notes,
"source_url": str(self.source_url),
"source_url": self.source_url,
}

# Create deterministic string representation
data_str = str(sorted(core_data.items()))
return hashlib.sha256(data_str.encode()).hexdigest()

def get_identity_hash(self) -> str:
"""Generate hash for identifying same deprecation (for updates)."""
# Only include immutable fields that identify the unique deprecation
"""Generate hash for identifying same deprecation."""
import hashlib

identity_data = {
"provider": self.provider,
"model": self.model,
"deprecation_date": self.deprecation_date.isoformat(),
"retirement_date": self.retirement_date.isoformat(),
"source_url": str(self.source_url),
"source_url": self.source_url,
}

# Create deterministic string representation
data_str = str(sorted(identity_data.items()))
return hashlib.sha256(data_str.encode()).hexdigest()

def same_deprecation(self, other: "Deprecation") -> bool:
"""Check if this represents the same deprecation (for updates)."""
def same_deprecation(self, other: "DeprecationEntry") -> bool:
"""Check if this represents the same deprecation."""
return self.get_identity_hash() == other.get_identity_hash()

def __eq__(self, other: object) -> bool:
"""Compare deprecations based on core data (excluding last_updated)."""
if not isinstance(other, Deprecation):
if not isinstance(other, DeprecationEntry):
return False
return self.get_hash() == other.get_hash()

def __hash__(self) -> int:
"""Hash based on core data for use in sets/dicts."""
return hash(self.get_hash())

def __str__(self) -> str:
"""String representation of deprecation."""
return (
f"Deprecation({self.provider} {self.model}: "
f"{self.deprecation_date.date()} -> {self.retirement_date.date()})"
f"DeprecationEntry(provider='{self.provider}', model='{self.model}', "
f"deprecation_date='{self.deprecation_date.date()}', "
f"retirement_date='{self.retirement_date.date()}')"
)

def is_active(self) -> bool:
"""Check if the deprecation is still active (not yet retired)."""
now = datetime.now(UTC)
return self.retirement_date > now

def to_rss_item(self) -> dict[str, Any]:
"""Convert deprecation to RSS item format (compatibility with main)."""
title = f"{self.provider}: {self.model} Deprecation"
description_parts = [
f"Model: {self.model}",
f"Provider: {self.provider}",
f"Deprecation Date: {self.deprecation_date.strftime('%Y-%m-%d')}",
f"Retirement Date: {self.retirement_date.strftime('%Y-%m-%d')}",
]
if self.replacement:
description_parts.append(f"Replacement: {self.replacement}")
if self.notes:
description_parts.append(f"Notes: {self.notes}")

return {
"title": title,
"description": " | ".join(description_parts),
"guid": str(self.source_url),
"pubDate": self.created_at,
"link": str(self.source_url),
}

def __repr__(self) -> str:
"""Detailed string representation."""
return (
f"Deprecation(provider='{self.provider}', model='{self.model}', "
f"deprecation_date={self.deprecation_date.isoformat()}, "
f"retirement_date={self.retirement_date.isoformat()})"
)
@property
def created_at(self) -> datetime:
"""Alias for compatibility with main branch."""
return self.deprecation_date


# Main has DeprecationEntry, we use Deprecation, create alias for compatibility
DeprecationEntry = Deprecation
# Alias for compatibility with other branches that may use Deprecation
Deprecation = DeprecationEntry
134 changes: 134 additions & 0 deletions src/rss/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""RSS feed configuration."""

from pathlib import Path
from typing import Any

from pydantic import BaseModel, Field


class FeedConfig(BaseModel):
"""Configuration for RSS feed generation."""

title: str = Field(
default="AI Model Deprecations",
description="Feed title",
)
description: str = Field(
default="Daily-updated RSS feed tracking AI model deprecations across providers",
description="Feed description",
)
link: str = Field(
default="https://deprecations.example.com",
description="Feed website link",
)
language: str = Field(
default="en",
description="Feed language code",
)
copyright: str | None = Field(
default=None,
description="Copyright information",
)
managing_editor: str | None = Field(
default=None,
description="Managing editor email",
)
webmaster: str | None = Field(
default=None,
description="Webmaster email",
)
ttl: int = Field(
default=1440,
description="Time to live in minutes (default 24 hours)",
gt=0,
)

model_config = {"validate_assignment": True}


class VersionConfig(BaseModel):
"""Configuration for RSS feed versioning."""

version: str = Field(
default="v1",
description="Feed version identifier",
pattern=r"^v\d+$",
)
supported_versions: list[str] = Field(
default_factory=lambda: ["v1"],
description="List of supported versions",
)

def is_version_supported(self, version: str) -> bool:
"""Check if a version is supported."""
return version in self.supported_versions

model_config = {"validate_assignment": True}


class OutputConfig(BaseModel):
"""Configuration for RSS feed output paths."""

base_path: Path = Field(
default=Path("output/rss"),
description="Base output directory for RSS feeds",
)
filename: str = Field(
default="feed.xml",
description="RSS feed filename",
)

def get_versioned_path(self, version: str) -> Path:
"""Get the full path for a versioned feed."""
return self.base_path / version / self.filename

def ensure_directories(self, version: str) -> None:
"""Ensure output directories exist for a given version."""
versioned_dir = self.base_path / version
versioned_dir.mkdir(parents=True, exist_ok=True)

model_config = {"validate_assignment": True}


class RSSConfig(BaseModel):
"""Complete RSS configuration."""

feed: FeedConfig = Field(
default_factory=FeedConfig,
description="Feed metadata configuration",
)
version: VersionConfig = Field(
default_factory=VersionConfig,
description="Version configuration",
)
output: OutputConfig = Field(
default_factory=OutputConfig,
description="Output path configuration",
)

@classmethod
def from_dict(cls, config_dict: dict[str, Any]) -> "RSSConfig":
"""Create RSSConfig from dictionary."""
return cls(
feed=FeedConfig(**config_dict.get("feed", {})),
version=VersionConfig(**config_dict.get("version", {})),
output=OutputConfig(**config_dict.get("output", {})),
)

def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"feed": self.feed.model_dump(),
"version": self.version.model_dump(),
"output": {
"base_path": str(self.output.base_path),
"filename": self.output.filename,
},
}

model_config = {"validate_assignment": True}


def get_default_config() -> RSSConfig:
"""Get default RSS configuration."""
return RSSConfig()
Loading
Loading