Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
51be775
support filters and strongly typed responses
Sep 5, 2024
ed23bdc
fix: filter regex
Sep 5, 2024
daf67a2
fix: improper replace in filter
Sep 5, 2024
9a6b2e5
fix: check for list or dict in response
Sep 5, 2024
4dacd32
add: cli templates
Sep 13, 2024
83cf75d
feat: get all metadata vars
Sep 16, 2024
9107606
revert metavars
Sep 16, 2024
68963eb
get meta_variables
Sep 17, 2024
730fae5
nit: missing doc
Oct 1, 2024
0a5814a
feat: allow filter on policy packages
Oct 1, 2024
b18d7a2
feat: sdwan monitors
Oct 8, 2024
77ec015
fix: FMG 7.4 support for switch adding in adom
patrickfnielsen Feb 3, 2025
b272b3c
fix: FMG 7.4 add_to_fortigate
patrickfnielsen Feb 3, 2025
e911515
fix: fmg 7.4 support for add_to_fortigate
patrickfnielsen Feb 3, 2025
1b8b0a5
fix (fortiswitch): upgrade to fmg 7.4
Feb 4, 2025
8a18802
feat: replace methods for fortigate and fortiswitch
Feb 7, 2025
1f9e195
fix: handle task states above 5
Feb 8, 2025
9875037
nit: update doc string for fmg 7.4
Feb 11, 2025
9425e27
fix: use rma rule for fgt replace
Feb 11, 2025
9332977
feat: added support for model_cluster
Mar 5, 2025
7163b35
feat: support verbose output
Apr 22, 2025
242c6e6
feat: get matched devices on fortigate
May 1, 2025
d7799c1
fix: path for matched devices
May 1, 2025
8683396
fix: use matched-devices
May 1, 2025
547d821
feat: add log query
May 6, 2025
5aa986a
Merge branch 'BESTSELLER:master' into filter-pythonic-refactor
patrickfnielsen May 27, 2025
8dadd0c
test device blueprint
Jun 2, 2025
53c05af
feat: blueprint support
Jun 2, 2025
084d29e
fix: blueprint docs
Jun 2, 2025
8d6249c
feat: allow settings flags on fgt update
Jun 3, 2025
36deffd
feat: refresh switches
Jun 4, 2025
d7d9dd6
fix: remove duplicate function
Jul 8, 2025
b4e3203
feat: set metadata
Jul 11, 2025
4c85be2
feat: add/update cli templates
Jul 17, 2025
02f3af5
feat: support pre-run
Jul 17, 2025
1af0105
feat: basic firewall object support
Jul 17, 2025
6a361dd
fix: members != member
Jul 17, 2025
16125f5
add blueprint support to fgt cluster
Sep 23, 2025
4fa64c1
feat: add get_detected_devices
Sep 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyfortimanager/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from pyfortimanager.core.api import Api as api
from pyfortimanager.core.fortimanager import FortiManager #noqa
from pyfortimanager.core.filter import F #noqa
192 changes: 86 additions & 106 deletions pyfortimanager/core/api.py
Original file line number Diff line number Diff line change
@@ -1,124 +1,104 @@
from pyfortimanager.models.adoms import ADOMs
from pyfortimanager.models.cli_template_groups import CLI_Template_Groups
from pyfortimanager.models.device_groups import Device_Groups
from pyfortimanager.models.fortiaps_proxy import FortiAPs_Proxy
from pyfortimanager.models.fortiaps import FortiAPs
from pyfortimanager.models.fortigates_proxy import FortiGates_Proxy
from pyfortimanager.models.fortigates import FortiGates
from pyfortimanager.models.fortiswitches_proxy import FortiSwitches_Proxy
from pyfortimanager.models.fortiswitches import FortiSwitches
from pyfortimanager.models.install_wizard import Install_Wizard
from pyfortimanager.models.metadata_variables import MetadataVariables
from pyfortimanager.models.policy_packages import Policy_Packages
from pyfortimanager.models.radius_servers import RADIUS_Servers
from pyfortimanager.models.scripts import Scripts
from pyfortimanager.models.sdwan_templates import SDWAN_Templates
from pyfortimanager.models.system import System


class Api(object):
"""Base API class.
"""
import requests
from dataclasses import field
from typing import Generic, List, Optional, TypeVar, Union
from pydantic import BaseModel as PBaseModel

def __init__(self, host: str, token: str, adom: str = "root", verify: bool = True, proxy_timeout: int = 60, **kwargs):
self.host = host
self.token = token
self.adom = adom
self.verify = verify
self.proxy_timeout = proxy_timeout

@property
def adoms(self):
"""Endpoints related to ADOM management.
"""
return ADOMs(api=self)
class FMGObject(PBaseModel):
pass

@property
def cli_template_groups(self):
"""Endpoints related to CLI Template Groups.
"""
return CLI_Template_Groups(api=self)

@property
def device_groups(self):
"""Endpoints related to Device Groups.
"""
return Device_Groups(api=self)
T = TypeVar("FMGObject")
class FMGResponse(Generic[T]):
"""Response to a request

@property
def fortiaps_proxy(self):
"""Endpoints related to FortiAP proxy calls on a FortiGate.
"""
return FortiAPs_Proxy(api=self)
Attributes:
data (dict|List[FMGObject]): response data
status (int): status code
success (bool): True on success
message (optional[str]): error message if an error occured, else None
"""

@property
def fortiaps(self):
"""Endpoints related to FortiAP management.
"""
return FortiAPs(api=self)
data: Union[dict, List[T]] = field(default_factory=dict) # data from FMG
status: int = 0 # status code of the request
success: bool = False # True on successful request
message: Optional[str] = None # error message if an error was found

def __bool__(self) -> bool:
return self.success

def first(self) -> Optional[Union[T, dict]]:
"""Return first data or None if result is empty"""
if isinstance(self.data, dict):
if isinstance(self.data.get("data"), list):
return self.data.get("data")[0] if self.data.get("data") else None
else:
return self.data
elif isinstance(self.data, list) and self.data: # non-empty list
return self.data[0]

return None


class BaseModel:
"""API class for FortiManager login management and post requests.
"""

@property
def fortigates_proxy(self):
"""Endpoints related to proxy calls on a FortiGate.
"""
return FortiGates_Proxy(api=self)
def __init__(self, api, **kwargs):
self.api = api
self.base_url = f"{self.api.host}/jsonrpc"

@property
def fortigates(self):
"""Endpoints related to FortiGate management.
"""
return FortiGates(api=self)

def post(self, method: str, params: dict, fmg_type: T = dict()) -> FMGResponse[T]:
"""Sends a POST request to the FortiManager API.

@property
def fortiswitches_proxy(self):
"""Endpoints related to FortiSwitch proxy calls on a FortiGate.
"""
return FortiSwitches_Proxy(api=self)
Args:
method (str): get, exec, add, set, update, delete.
params (dict): Payload data to send with the request.
fmg_type (T): Type of the FMGObject to return, defaults to dict.

@property
def fortiswitches(self):
"""Endpoints related to FortiSwitch management.
Returns:
FMGResponse: FMG Response status, message and data.
"""
return FortiSwitches(api=self)

@property
def install_wizard(self):
"""Endpoints related to the Install Wizard.
"""
return Install_Wizard(api=self)
headers = {
"Authorization": f"Bearer {self.api.token}"
}

@property
def metadata_variables(self):
"""Endpoints related to Metadata Variables.
"""
return MetadataVariables(api=self)
data = {
"method": method.lower(),
"verbose": 1 if self.api.verbose else 0,
"params": [params]
}

@property
def policy_packages(self):
"""Endpoints related to Policy Packages.
"""
return Policy_Packages(api=self)
result = FMGResponse[T]()
response = requests.post(url=self.base_url, json=data, verify=self.api.verify, headers=headers)

@property
def radius_servers(self):
"""Endpoints related to RADIUS_Servers.
"""
return RADIUS_Servers(api=self)
api_result = {}
if response.status_code == 200:
api_result = response.json()["result"][0]
result.success = True
else:
result.success = False

@property
def scripts(self):
"""Endpoints related to Scripts.
"""
return Scripts(api=self)
# handling empty result list
if not api_result.get("data"):
result.data = []
else:
objects = []
data = api_result.get("data")
if isinstance(data, dict):
objects.append(fmg_type(**data) if not isinstance(fmg_type, dict) else data)
elif isinstance(data, list):
for value in data:
objects.append(fmg_type(**value) if not isinstance(fmg_type, dict) else value)

@property
def sdwan_templates(self):
"""Endpoints related to SD-WAN Templates.
"""
return SDWAN_Templates(api=self)
result.data = objects

@property
def system(self):
"""Endpoints related to the FortiManager system.
"""
return System(api=self)
result.status = api_result.get("status", {}).get("code", 400)

if result.status != 0:
result.message = api_result.get("status", {}).get("message", None)

return result

144 changes: 144 additions & 0 deletions pyfortimanager/core/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import re
from typing import Literal, List, Union

# Idea from from https://github.com/realvitya/pyfortinet/tree/master

OP = {
"eq": "==",
"neq": "!=",
"lt": "<",
"le": "<=",
"gt": ">",
"ge": ">=",
"or": "&",
"in": "in",
"contain": "contain",
"like": "like",
"not_like": "!like",
"glob": "glob",
"not_glob": "!glob",
}

FiltersType = Union["F", "FilterList", "ComplexFilter"]

class F:
"""Filter class that allows us to define a single filter for an object

Argument format is {field}={value} or {field}__{operator}={value}
Only one argument can be passed!

Attributes:
negate (bool): If true the filter is negated
source (str): The source is the API attribute we are looking at
op (str): The operator for the search
targets (str): The target is the value we are searching for
"""

negate: bool = False
source: str = ""
op: str = ""
targets: Union[List[Union[int, str]], Union[int, str]]

def __init__(self, **kwargs):
"""Filter initialization"""
if len(kwargs) > 1:
raise ValueError("F only accepts one filter condition at a time!")

# support things like switch-id by using underscore, and then replacing it
rx = re.compile(r'([a-zA-Z0-9])(_)([a-zA-Z0-9])')
kwargs = { rx.sub(r"\g<1>-\g<3>", key):value for key, value in kwargs.items() }

for key, value in kwargs.items():
if "__" in key:
self.source, self.op = key.split("__")
if self.op not in OP:
raise ValueError(f"Unknown operation: '{self.op}' !")
self.op = OP[self.op]
else:
self.source = key
self.op = "=="
self.targets = value

def generate(self) -> List[str]:
"""Generate API filter list"""
out = []
if self.negate:
out.append("!")
out.append(self.source)
out.append(self.op)
if isinstance(self.targets, list):
out.extend(self.targets)
else:
out.append(self.targets)
return out

def __and__(self, other) -> "ComplexFilter":
return ComplexFilter(self, "&&", other)

def __or__(self, other) -> "ComplexFilter":
return ComplexFilter(self, "||", other)

def __invert__(self):
self.negate = not self.negate
return self

def __add__(self, other: Union["F", "FilterList"]):
return FilterList(self, other)


class FilterList:
"""List of F objects"""

members: list[F]

def __init__(self, *members: Union[F, "FilterList"]):
self.members = []
for member in members:
self + member

def __add__(self, other: Union[F, "FilterList"]):
if isinstance(other, F):
self.members.append(other)
elif isinstance(other, FilterList):
self.members.extend(other.members)
else:
raise ValueError(f"Elements '{other}' can't be added to FilterList")
return self

def __and__(self, other) -> "ComplexFilter":
return ComplexFilter(self, "&&", other)

def __or__(self, other) -> "ComplexFilter":
return ComplexFilter(self, "||", other)

def __len__(self):
return len(self.members)

def generate(self) -> List[List[str]]:
"""Generate API filter output"""
return [member.generate() for member in self.members]


class ComplexFilter:
"""Complex handling of filters and their operator"""

def __init__(
self,
a: Union["ComplexFilter", FilterList, F],
op: Literal["||", "&&"],
b: Union["ComplexFilter", FilterList, F],
):
self.a = a
self.op = op
self.b = b

def generate(self) -> list:
"""Generate API filter output"""
out = [self.a.generate(), self.op, self.b.generate()]
return out

def __and__(self, other) -> "ComplexFilter":
return ComplexFilter(self, "&&", other)

def __or__(self, other):
return ComplexFilter(self, "||", other)
Loading