Skip to content

Issue: Implement Structured Logging Using structlog #37

@csmangum

Description

@csmangum

We need to implement a consistent and structured logging pattern across our project using the structlog library. This will improve our ability to debug, monitor, and analyze our application's behavior.

Objectives

  1. Replace the current logging system with structlog.
  2. Implement a consistent logging pattern across all modules.
  3. Configure log formatting for different environments (development, production).
  4. Add context to logs for better traceability.

Tasks

  1. Install and configure structlog

    • Add structlog to the project dependencies.
    • Create a logging_config.py file to centralize logging configuration.
  2. Create a custom logger factory

    • Implement a function to create structured loggers.
    • Configure the logger to include timestamp, log level, and other relevant metadata.
  3. Replace existing logging calls

    • Update all current logging imports to use the new structured logger.
    • Replace logger.info(), logger.debug(), etc., calls with structlog equivalents.
  4. Add context to logs

    • Implement a context manager or decorator to add relevant context to log entries (e.g., function name, class name, request ID).
  5. Configure log formatting

    • Set up JSON formatting for production logs.
    • Use a more readable format for development logs.
  6. Update the Reporter class

    • Modify the Reporter class to use structlog.
    • Ensure all existing functionality is maintained.
  7. Implement log sampling

    • Configure log sampling for high-volume log events to reduce noise.
  8. Add log correlation

    • Implement a system to correlate logs across different parts of the application (e.g., using unique identifiers for each request or operation).
  9. Update tests

    • Modify existing tests to work with the new logging system.
    • Add new tests to verify logging behavior.
  10. Documentation

    • Update the project documentation to reflect the new logging system.
    • Provide examples of how to use the new structured logging in different scenarios.

Code Changes

The main changes will be in the following files:

pyology/reporter.py
import structlog
import sys
...

class Reporter:
    """
    A class to report events and log messages during the simulation.

    Methods
    -------
    log_event(message: str) -> None:
        Log an event message.
    log_warning(message: str) -> None:
        Log a warning message.
    log_error(message: str) -> None:
        Log an error message.
    log_atp_production(step: str, atp_produced: float) -> None:
        Log the ATP production for a specific step.
    report_simulation_results(results: dict) -> None:
        Report the simulation results.
    """

    def __init__(self):
        self.logger = structlog.get_logger(__name__)
        self.atp_production_log = []

    def info(self, message: str) -> None:
        """
        Log an info message.
        """
        self.logger.info(message)

    def debug(self, message: str) -> None:
        """
        Log a debug message.
        """
        self.logger.debug(message)

    def log_event(self, message: str) -> None:
        """
        Log an event message.
        """
        self.logger.info(message)

    def log_warning(self, message: str) -> None:
        """
        Log a warning message.
        """
        self.logger.warning(message)

    def log_error(self, message: str) -> None:
        """
        Log an error message.
        """
        self.logger.error(message)

    def log_atp_production(self, step: str, atp_produced: float) -> None:
        """
        Log the ATP production for a specific step.
        """
        self.atp_production_log.append((step, atp_produced))
        self.log_event(f"ATP produced in {step}: {atp_produced}")

    def report_simulation_results(self, results: dict) -> None:
        """
        Report the simulation results.
        """
        self.log_event(
            f"Simulation completed in {results['simulation_time']:.2f} seconds"
        )
        self.log_event(f"Total ATP produced: {results['total_atp_produced']:.2f}")
        self.log_event(f"Glucose processed: {results['glucose_processed']:.2f}")
        self.log_event(f"Glucose consumed: {results['glucose_consumed']:.2f}")
        self.log_event(f"Pyruvate produced: {results['pyruvate_produced']:.2f}")
        self.log_event(f"Oxygen remaining: {results['oxygen_remaining']:.2f}")
        self.log_event(f"Final cytoplasm ATP: {results['final_cytoplasm_atp']:.2f}")
        self.log_event(
            f"Final mitochondrion ATP: {results['final_mitochondrion_atp']:.2f}"
        )
        self.log_event(
            f"2-Phosphoglycerate remaining: {results['final_phosphoglycerate_2']:.2f}"
        )
        self.log_event(
            f"Phosphoenolpyruvate produced: {results['final_phosphoenolpyruvate']:.2f}"
        )

        self.log_event("\nATP Production Breakdown:")
        for step, atp in self.atp_production_log:
            self.log_event(f"  {step}: {atp:.2f}")

        self.atp_production_log.clear()  # Clear the log for the next simulation

    # Add this new method
    def error(self, message: str) -> None:
        """
        Log an error message (alias for log_error).
        """
        self.log_error(message)
utils/tracking.py
from dataclasses import dataclass
from typing import Any, Callable, Dict, List

from pyology.organelle import Organelle
from pyology.reporter import Reporter

from .command_data import CommandData
...

@dataclass
class CommandExecutionResult:
    result: Any
    initial_values: Dict[str, Any]
    final_values: Dict[str, Any]
    validation_results: Dict[str, Any]
...

def execute_command(
    organelle: "Organelle",
    command_data: CommandData,
    logger: Reporter,
    debug: bool = True,
) -> CommandExecutionResult:
    """
    Executes a command on an object based on the provided CommandData object,
    tracks specified attributes, logs the results, and performs optional validations.

    Parameters
    ----------
    command_data : CommandData
        The CommandData object containing all necessary information for command execution.
    logger : Reporter
        The logger object to use for logging.
    debug : bool, optional
        Whether to enable debug logging (default is True).

    Returns
    -------
    CommandExecutionResult
        A dataclass containing:
        - result: The result of the executed command
        - initial_values: Initial values of tracked attributes
        - final_values: Final values of tracked attributes
        - validation_results: Results of performed validations
    """
    obj = command_data.obj
    command = command_data.command
    tracked_attributes = command_data.tracked_attributes
    args = command_data.args
    kwargs = command_data.kwargs
    validations = command_data.validations

    initial_values = _log_attribute_values(
        logger, organelle, tracked_attributes, "Initial", debug
    )
    # Execute the command
    try:
        if callable(command):
            result = command(obj, *args, **kwargs)
        else:
            result = getattr(obj, command)(*args, **kwargs)
    except Exception as e:
        logger.error(f"Error executing command '{command}': {str(e)}")
        raise

    final_values = _log_attribute_values(
        logger, organelle, tracked_attributes, "Final", debug
    )

    validation_results = _log_validation_results(
        logger, organelle, initial_values, final_values, validations
    )

    # Prepare and return results
    return CommandExecutionResult(
        result=result,
        initial_values=initial_values,
        final_values=final_values,
        validation_results=validation_results,
    )
...

def _log_attribute_values(
    logger: Reporter,
    organelle: "Organelle",
    tracked_attributes: List[str],
    stage: str,
    debug: bool = True,
) -> Dict[str, Any]:
    """
    Log the values of the tracked attributes.

    Parameters
    ----------
    logger : Reporter
        The logger object to use for logging.
    organelle : "Organelle"
        The organelle to log the attribute values of.
    tracked_attributes : List[str]
        The attributes to log.
    stage : str
        The stage of the simulation to log the attribute values for.
    debug : bool, optional
        Whether to enable debug logging (default is True).

    Returns
    -------
    Dict[str, Any]
        A dictionary containing the attribute values.
    """
    values = {}
    for attr in tracked_attributes:
        try:
            quantity = organelle.get_metabolite_quantity(attr)
            values[attr] = quantity
        except AttributeError:
            logger.error(
                f"Object does not have 'get_metabolite_quantity' method. Skipping '{attr}'."
            )
        except ValueError as e:
            logger.warning(
                f"Error getting quantity for metabolite '{attr}': {str(e)}. Skipping."
            )
        except Exception as e:
            logger.error(
                f"Unexpected error occurred while getting quantity for metabolite '{attr}': {str(e)}. Skipping."
            )

    if debug:
        logger.debug(f"{stage} values: {values}")

    return values
...

def _log_validation_results(
    logger: Reporter,
    organelle: "Organelle",
    initial_values: Dict[str, Any],
    final_values: Dict[str, Any],
    validations: List[Callable[[Any, Dict[str, Any], Dict[str, Any]], bool]],
) -> Dict[str, Any]:
    """
    Log the validation results.

    #! Need to validate this process

    Parameters
    ----------
    logger : Reporter
        The logger object to use for logging.
    organelle : "Organelle"
        The organelle to validate.
    initial_values : Dict[str, Any]
        The initial values of the tracked attributes.
    final_values : Dict[str, Any]
        The final values of the tracked attributes.
    validations : List[Callable[[Any, Dict[str, Any], Dict[str, Any]], bool]]
        The validations to perform. Each validation should take three arguments:
        the object, the initial values, and the final values.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing the validation results.
    """
    # Perform validations
    validation_results = {}
    for idx, validation in enumerate(validations, start=1):
        try:
            validation_result = validation(organelle, initial_values, final_values)
            validation_results[f"validation_{idx}"] = validation_result
            if not validation_result:
                logger.warning(f"Validation {idx} failed: {validation.__name__}")
        except Exception as e:
            logger.error(f"Error during validation {validation.__name__}: {str(e)}")
            validation_results[f"validation_{idx}"] = f"error: {str(e)}"
    return validation_results
pyology/glycolysis.py
import structlog
from typing import TYPE_CHECKING, Tuple

from utils.command_data import CommandData
from utils.tracking import execute_command

from .common_reactions import GlycolysisReactions
from .energy_calculations import (
    calculate_energy_state,
    calculate_total_adenine_nucleotides,
)
from .exceptions import GlycolysisError, ReactionError
from .pathway import Pathway

if TYPE_CHECKING:
    from .organelle import Organelle


class Glycolysis(Pathway):
    """
    Class representing the glycolysis pathway.
    """

    time_step = 1
    reactions = GlycolysisReactions

    def __init__(self, debug=False):
        self.debug = debug
        self.logger = structlog.get_logger(__name__)

    def run(
        self, organelle: "Organelle", glucose_units: float
    ) -> Tuple[float, float, float]:
        """
        Executes the glycolysis pathway for a given number of glucose units.
        """
        try:
            if glucose_units <= 0:
                raise GlycolysisError("The number of glucose units must be positive.")

            initial_energy = calculate_energy_state(organelle, self.logger)
            initial_adenine = calculate_total_adenine_nucleotides(organelle, self.logger)

            # Investment phase
            investment_results = execute_command(
                organelle,
                CommandData(
                    obj=self,
                    command=Glycolysis.investment_phase,
                    tracked_attributes=["ATP", "ADP", "AMP", "glucose"],
                    args=(glucose_units,),
                ),
                logger=self.logger,
            )

            # Yield phase
            yield_results = execute_command(
                organelle,
                CommandData(
                    obj=self,
                    command=Glycolysis.yield_phase,
                    tracked_attributes=["ATP", "ADP", "AMP"],
                    args=(glucose_units * 2,),
                ),
                logger=self.logger,
            )

            pyruvate_produced = yield_results.result

            self.logger.info(
                "Glycolysis completed.", pyruvate_produced=pyruvate_produced
            )
            self.logger.info(
                "Final metabolite levels.", metabolites=organelle.metabolites.quantities
            )

            final_energy = calculate_energy_state(organelle, self.logger)
            final_adenine = calculate_total_adenine_nucleotides(organelle, self.logger)

            # Check energy conservation
            energy_difference = final_energy - initial_energy
            if abs(energy_difference) > 1e-6:
                self.logger.warning(
                    "Energy not conserved in glycolysis.",
                    energy_difference=energy_difference,
                )

            # Check adenine nucleotide conservation
            adenine_difference = final_adenine - initial_adenine
            if abs(adenine_difference) > 1e-6:
                self.logger.warning(
                    "Adenine nucleotides not conserved in glycolysis.",
                    adenine_difference=adenine_difference,
                )

            return final_energy, final_adenine, pyruvate_produced

        except Exception as e:
            self.logger.error("Error during glycolysis.", error=str(e))
            raise GlycolysisError(f"Glycolysis failed: {str(e)}")

    @classmethod
    def investment_phase(cls, organelle: "Organelle", glucose_units: float) -> None:
        """
        Perform the investment phase of glycolysis.
        """
        logger = structlog.get_logger(__name__)
        logger.info("Starting investment phase.", glucose_units=glucose_units)
        initial_atp = organelle.get_metabolite_quantity("ATP")

        for i in range(int(glucose_units)):
            logger.info("Processing glucose unit.", unit=i + 1, total=glucose_units)
            try:
                cls.reactions.hexokinase.transform(organelle=organelle)
                cls.reactions.phosphoglucose_isomerase.transform(organelle=organelle)
                cls.reactions.phosphofructokinase.transform(organelle=organelle)
                cls.reactions.aldolase.transform(organelle=organelle)
                cls.reactions.triose_phosphate_isomerase.transform(organelle=organelle)
            except ReactionError as e:
                logger.error(
                    "Investment phase failed.", unit=i + 1, error=str(e)
                )
                raise GlycolysisError(f"Investment phase failed: {str(e)}")

        final_atp = organelle.get_metabolite_quantity("ATP")
        logger.info(
            "ATP consumed in investment phase.", atp_consumed=initial_atp - final_atp
        )

    @classmethod
    def yield_phase(cls, organelle: "Organelle", g3p_units: float) -> None:
        """
        Perform the yield phase of glycolysis.
        """
        logger = structlog.get_logger(__name__)
        logger.info("Starting yield phase.", g3p_units=g3p_units)
        initial_atp = organelle.get_metabolite_quantity("ATP")

        for i in range(int(g3p_units)):
            logger.info("Processing G3P unit.", unit=i + 1, total=g3p_units)
            try:
                cls.reactions.glyceraldehyde_3_phosphate_dehydrogenase.transform(
                    organelle=organelle
                )
                cls.reactions.phosphoglycerate_kinase.transform(organelle=organelle)
                cls.reactions.phosphoglycerate_mutase.transform(organelle=organelle)
                cls.reactions.enolase.transform(organelle=organelle)
                cls.reactions.pyruvate_kinase.transform(organelle=organelle)
            except ReactionError as e:
                logger.error(
                    "Yield phase failed.", unit=i + 1, error=str(e)
                )
                raise GlycolysisError(f"Yield phase failed: {str(e)}")

        final_atp = organelle.get_metabolite_quantity("ATP")
        logger.info(
            "ATP produced in yield phase.", atp_produced=final_atp - initial_atp
        )
pyology/krebs_cycle.py
class KrebsCycle(Pathway):
    ...
    def run(
        self, organelle: "Organelle", acetyl_coa_units: float
    ) -> Tuple[float, float, float]:
        """
        Executes the Krebs Cycle pathway for a given number of acetyl-CoA units.
        """
        logger = structlog.get_logger(__name__)
        try:
            if acetyl_coa_units <= 0:
                raise KrebsCycleError(
                    "The number of acetyl-CoA units must be positive."
                )

            initial_energy = calculate_energy_state(organelle, logger)
            initial_adenine = calculate_total_adenine_nucleotides(organelle)

            co2_produced = 0
            for i in range(int(acetyl_coa_units)):
                cycle_results = execute_command(
                    organelle,
                    CommandData(
                        obj=self,
                        command=self.cycle,
                        tracked_attributes=[
                            "ATP",
                            "ADP",
                            "AMP",
                            "Acetyl_CoA",
                            "NAD",
                            "NADH",
                            "FAD",
                            "FADH2",
                            "CO2",
                        ],
                        args=(),
                        kwargs={},
                    ),
                    logger=logger,
                )
                co2_produced += cycle_results.result
            logger.info("Krebs Cycle completed.", co2_produced=co2_produced)
            logger.info("Final metabolite levels.", metabolites=organelle.metabolites.quantities)

            final_energy = calculate_energy_state(organelle, logger)
            final_adenine = calculate_total_adenine_nucleotides(organelle)

            # Check energy conservation
            energy_difference = final_energy - initial_energy
            if abs(energy_difference) > 1e-6:
                logger.warning(
                    "Energy not conserved in Krebs Cycle.",
                    energy_difference=energy_difference,
                )

            # Check adenine nucleotide conservation
            adenine_difference = final_adenine - initial_adenine
            if abs(adenine_difference) > 1e-6:
                logger.warning(
                    "Adenine nucleotides not conserved in Krebs Cycle.",
                    adenine_difference=adenine_difference,
                )

            return final_energy, final_adenine, co2_produced

        except Exception as e:
            logger.error("Error during Krebs Cycle.", error=str(e))
            raise KrebsCycleError(f"Krebs Cycle failed: {str(e)}")

    def cycle(self, organelle: "Organelle") -> int:
        """
        Perform one complete cycle of the Krebs Cycle.
        """
        logger = structlog.get_logger(__name__)
        logger.info("Starting Krebs Cycle")
        co2_produced = 0

        try:
            self.reactions.citrate_synthase.transform(organelle=organelle)
            self.reactions.aconitase.transform(organelle=organelle)
            self.reactions.isocitrate_dehydrogenase.transform(organelle=organelle)
            co2_produced += 1
            self.reactions.alpha_ketoglutarate_dehydrogenase.transform(
                organelle=organelle
            )
            co2_produced += 1
            self.reactions.succinyl_coa_synthetase.transform(organelle=organelle)
            self.reactions.succinate_dehydrogenase.transform(organelle=organelle)
            self.reactions.fumarase.transform(organelle=organelle)
            self.reactions.malate_dehydrogenase.transform(organelle=organelle)

        except ReactionError as e:
            logger.error("Krebs Cycle failed.", error=str(e))
            raise KrebsCycleError(f"Krebs Cycle failed: {str(e)}")
        logger.info("Krebs Cycle completed.", co2_produced=co2_produced)
        return co2_produced

Example Implementation

Here's a basic example of how the new logging setup might look:

import structlog

def get_logger(name):
    return structlog.get_logger(name)

# In a module:
logger = get_logger(__name__)

def some_function():
    logger.info("Processing data", data_size=1000, status="started")
    # ... processing ...
    logger.info("Data processed", status="completed", duration=5.2)

Acceptance Criteria

  • All logging calls in the project use structlog.
  • Logs include relevant context (e.g., function name, class, timestamp).
  • Log format is JSON in production and human-readable in development.
  • Existing functionality of the Reporter class is maintained.
  • All tests pass with the new logging system.
  • Documentation is updated to reflect the new logging pattern.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions