Skip to content

Commit a5105f7

Browse files
committed
added netprobe arg
1 parent a714789 commit a5105f7

File tree

5 files changed

+214
-11
lines changed

5 files changed

+214
-11
lines changed

nodescraper/cli/dynamicparserbuilder.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
#
2525
###############################################################################
2626
import argparse
27-
from typing import Optional, Type
27+
from typing import Literal, Optional, Type, get_args, get_origin
2828

2929
from pydantic import BaseModel
3030

@@ -96,19 +96,50 @@ def get_model_arg(cls, type_class_map: dict) -> Optional[Type[BaseModel]]:
9696
None,
9797
)
9898

99+
@classmethod
100+
def get_literal_choices(cls, type_class_map: dict) -> Optional[list]:
101+
"""Get the choices from a Literal type if present
102+
103+
Args:
104+
type_class_map (dict): mapping of type classes
105+
106+
Returns:
107+
Optional[list]: list of valid choices for the Literal type, or None if not a Literal
108+
"""
109+
# Check if Literal is in the type_class_map
110+
literal_type = type_class_map.get(Literal)
111+
if literal_type and literal_type.inner_type is not None:
112+
# The inner_type contains the first literal value, but we need all of them
113+
# We need to get the original annotation to extract all literal values
114+
# For now, return None and we'll handle this differently
115+
return None
116+
return None
117+
99118
def add_argument(
100119
self,
101120
type_class_map: dict,
102121
arg_name: str,
103122
required: bool,
123+
annotation: Optional[Type] = None,
104124
) -> None:
105125
"""Add an argument to a parser with an appropriate type
106126
107127
Args:
108128
type_class_map (dict): type classes for the arg
109129
arg_name (str): argument name
110130
required (bool): whether or not the arg is required
131+
annotation (Optional[Type]): full type annotation for extracting Literal choices
111132
"""
133+
# Check for Literal types and extract choices
134+
literal_choices = None
135+
if Literal in type_class_map and annotation:
136+
# Extract all arguments from the annotation
137+
args = get_args(annotation)
138+
for arg in args:
139+
if get_origin(arg) is Literal:
140+
literal_choices = list(get_args(arg))
141+
break
142+
112143
if list in type_class_map:
113144
type_class = type_class_map[list]
114145
self.parser.add_argument(
@@ -125,6 +156,15 @@ def add_argument(
125156
required=required,
126157
choices=[True, False],
127158
)
159+
elif Literal in type_class_map and literal_choices:
160+
# Add argument with choices for Literal types
161+
self.parser.add_argument(
162+
f"--{arg_name}",
163+
type=str,
164+
required=required,
165+
choices=literal_choices,
166+
metavar=f"{{{','.join(literal_choices)}}}",
167+
)
128168
elif float in type_class_map:
129169
self.parser.add_argument(
130170
f"--{arg_name}", type=float, required=required, metavar=META_VAR_MAP[float]
@@ -166,6 +206,10 @@ def build_model_arg_parser(self, model: type[BaseModel], required: bool) -> list
166206
if type(None) in type_class_map and len(attr_data.type_classes) == 1:
167207
continue
168208

169-
self.add_argument(type_class_map, attr.replace("_", "-"), required)
209+
# Get the full annotation from the model field
210+
field = model.model_fields.get(attr)
211+
annotation = field.annotation if field else None
212+
213+
self.add_argument(type_class_map, attr.replace("_", "-"), required, annotation)
170214

171215
return list(type_map.keys())

nodescraper/plugins/inband/network/collector_args.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
#
2525
###############################################################################
2626

27-
from typing import Optional
27+
from typing import Literal, Optional
2828

2929
from nodescraper.models import CollectorArgs
3030

3131

3232
class NetworkCollectorArgs(CollectorArgs):
3333
url: Optional[str] = None
34+
netprobe: Optional[Literal["ping", "wget", "curl"]] = None

nodescraper/plugins/inband/network/network_collector.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ class NetworkCollector(InBandDataCollector[NetworkDataModel, NetworkCollectorArg
6666
CMD_NEIGHBOR = "ip neighbor show"
6767
CMD_ETHTOOL_TEMPLATE = "ethtool {interface}"
6868
CMD_PING = "ping"
69+
CMD_WGET = "wget"
70+
CMD_CURL = "curl"
6971

7072
# LLDP commands
7173
CMD_LLDPCLI_NEIGHBOR = "lldpcli show neighbor"
@@ -1671,21 +1673,32 @@ def _collect_pensando_nic_info(
16711673
uncollected_commands,
16721674
)
16731675

1674-
def _check_network_connectivity(self, url: str) -> bool:
1675-
"""Check network connectivity by pinging a URL.
1676+
def _check_network_connectivity(self, cmd: str, url: str) -> bool:
1677+
"""Check network connectivity using specified command.
16761678
16771679
Args:
1678-
url: URL or hostname to ping
1680+
cmd: Command to use for connectivity check (ping, wget, or curl)
1681+
url: URL or hostname to check
16791682
16801683
Returns:
16811684
bool: True if network is accessible, False otherwise
16821685
"""
1686+
if cmd not in {"ping", "wget", "curl"}:
1687+
raise ValueError(
1688+
f"Invalid network probe command: '{cmd}'. "
1689+
f"Valid options are: 'ping', 'wget', 'curl'"
1690+
)
16831691

16841692
# Determine ping options based on OS
16851693
ping_option = "-c 1" if self.system_info.os_family == OSFamily.LINUX else "-n 1"
16861694

1687-
# Run ping command
1688-
result = self._run_sut_cmd(f"{self.CMD_PING} {url} {ping_option}")
1695+
# Build command based on cmd parameter using class constants
1696+
if cmd == "ping":
1697+
result = self._run_sut_cmd(f"{self.CMD_PING} {url} {ping_option}")
1698+
elif cmd == "wget":
1699+
result = self._run_sut_cmd(f"{self.CMD_WGET} {url}")
1700+
else: # curl
1701+
result = self._run_sut_cmd(f"{self.CMD_CURL} {url}")
16891702

16901703
if result.exit_code == 0:
16911704
self._log_event(
@@ -1697,7 +1710,7 @@ def _check_network_connectivity(self, url: str) -> bool:
16971710
else:
16981711
self._log_event(
16991712
category=EventCategory.NETWORK,
1700-
description=f"{self.CMD_PING} to {url} failed!",
1713+
description=f"{cmd} to {url} failed!",
17011714
data={"url": url, "not accessible": result.exit_code == 0},
17021715
priority=EventPriority.ERROR,
17031716
)
@@ -1737,7 +1750,19 @@ def collect_data(
17371750

17381751
# Check network connectivity if URL is provided
17391752
if args and args.url:
1740-
network_accessible = self._check_network_connectivity(args.url)
1753+
cmd = args.netprobe if args.netprobe else "ping"
1754+
try:
1755+
network_accessible = self._check_network_connectivity(cmd, args.url)
1756+
except ValueError as e:
1757+
self._log_event(
1758+
category=EventCategory.NETWORK,
1759+
description=str(e),
1760+
data={"netprobe": cmd, "url": args.url},
1761+
priority=EventPriority.ERROR,
1762+
console_log=True,
1763+
)
1764+
# Set network_accessible to None since we couldn't check
1765+
network_accessible = None
17411766

17421767
# Collect interface/address information
17431768
res_addr = self._run_sut_cmd(self.CMD_ADDR)

test/functional/fixtures/network_plugin_config.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
"plugins": {
44
"NetworkPlugin": {
55
"collector_args": {
6-
"url": "mock.example.com"
6+
"url": "mock.example.com",
7+
"netprobe": "ping"
78
},
89
"analysis_args": {}
910
}

test/functional/test_network_plugin.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,135 @@ def test_network_plugin_with_url(run_cli_command, network_config_file, tmp_path)
122122
assert result.returncode in [0, 1, 2]
123123
output = result.stdout + result.stderr
124124
assert len(output) > 0
125+
126+
127+
def test_network_plugin_with_netprobe_ping(run_cli_command, tmp_path):
128+
"""Test NetworkPlugin with netprobe set to ping."""
129+
log_path = str(tmp_path / "logs_network_netprobe_ping")
130+
result = run_cli_command(
131+
[
132+
"--log-path",
133+
log_path,
134+
"run-plugins",
135+
"NetworkPlugin",
136+
"--url",
137+
"google.com",
138+
"--netprobe",
139+
"ping",
140+
],
141+
check=False,
142+
)
143+
144+
assert result.returncode in [0, 1, 2]
145+
output = result.stdout + result.stderr
146+
assert len(output) > 0
147+
148+
149+
def test_network_plugin_with_netprobe_wget(run_cli_command, tmp_path):
150+
"""Test NetworkPlugin with netprobe set to wget."""
151+
log_path = str(tmp_path / "logs_network_netprobe_wget")
152+
result = run_cli_command(
153+
[
154+
"--log-path",
155+
log_path,
156+
"run-plugins",
157+
"NetworkPlugin",
158+
"--url",
159+
"google.com",
160+
"--netprobe",
161+
"wget",
162+
],
163+
check=False,
164+
)
165+
166+
assert result.returncode in [0, 1, 2]
167+
output = result.stdout + result.stderr
168+
assert len(output) > 0
169+
170+
171+
def test_network_plugin_with_netprobe_curl(run_cli_command, tmp_path):
172+
"""Test NetworkPlugin with netprobe set to curl."""
173+
log_path = str(tmp_path / "logs_network_netprobe_curl")
174+
result = run_cli_command(
175+
[
176+
"--log-path",
177+
log_path,
178+
"run-plugins",
179+
"NetworkPlugin",
180+
"--url",
181+
"google.com",
182+
"--netprobe",
183+
"curl",
184+
],
185+
check=False,
186+
)
187+
188+
assert result.returncode in [0, 1, 2]
189+
output = result.stdout + result.stderr
190+
assert len(output) > 0
191+
192+
193+
def test_network_plugin_with_invalid_netprobe(run_cli_command, tmp_path):
194+
"""Test NetworkPlugin with invalid netprobe value - should fail at CLI validation."""
195+
log_path = str(tmp_path / "logs_network_invalid_netprobe")
196+
result = run_cli_command(
197+
[
198+
"--log-path",
199+
log_path,
200+
"run-plugins",
201+
"NetworkPlugin",
202+
"--url",
203+
"google.com",
204+
"--netprobe",
205+
"invalid",
206+
],
207+
check=False,
208+
)
209+
210+
# Should fail with exit code 2 (argparse error)
211+
assert result.returncode == 2
212+
output = result.stdout + result.stderr
213+
assert len(output) > 0
214+
assert "invalid choice" in output.lower()
215+
assert "choose from" in output.lower()
216+
217+
218+
def test_network_plugin_with_url_no_netprobe(run_cli_command, tmp_path):
219+
"""Test NetworkPlugin with URL but no netprobe - should default to ping."""
220+
log_path = str(tmp_path / "logs_network_url_default")
221+
result = run_cli_command(
222+
[
223+
"--log-path",
224+
log_path,
225+
"run-plugins",
226+
"NetworkPlugin",
227+
"--url",
228+
"google.com",
229+
],
230+
check=False,
231+
)
232+
233+
assert result.returncode in [0, 1, 2]
234+
output = result.stdout + result.stderr
235+
assert len(output) > 0
236+
237+
238+
def test_network_plugin_with_netprobe_no_url(run_cli_command, tmp_path):
239+
"""Test NetworkPlugin with netprobe but no URL - should skip connectivity check."""
240+
log_path = str(tmp_path / "logs_network_netprobe_no_url")
241+
result = run_cli_command(
242+
[
243+
"--log-path",
244+
log_path,
245+
"run-plugins",
246+
"NetworkPlugin",
247+
"--netprobe",
248+
"ping",
249+
],
250+
check=False,
251+
)
252+
253+
# Should succeed but skip connectivity check
254+
assert result.returncode in [0, 1, 2]
255+
output = result.stdout + result.stderr
256+
assert len(output) > 0

0 commit comments

Comments
 (0)