Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion simvue/api/objects/alert/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,19 @@ class AlertBase(SimvueObject):

@override
@classmethod
def new(cls, *_, **__) -> Self:
def new(
cls,
*,
name: typing.Annotated[str, pydantic.Field(pattern=NAME_REGEX)],
description: str | None,
notification: typing.Literal["none", "email"],
enabled: bool,
allow_duplicates: bool,
offline: bool,
server_url: str | None,
server_token: pydantic.SecretStr | None,
**_,
) -> Self:
raise NotImplementedError

@override
Expand All @@ -42,10 +54,14 @@ def __init__(
**kwargs,
) -> None:
"""Retrieve an alert from the Simvue server by identifier"""
_params: dict[str, str | bool] = kwargs.pop("_params", {}) | {
"deduplicate": not kwargs.get("allow_duplicates", True)
}
super().__init__(
identifier=identifier,
server_url=server_url,
server_token=server_token,
_params=_params,
**kwargs,
)
self._local_only_args += [
Expand Down Expand Up @@ -211,6 +227,8 @@ def set_status(self, run_id: str, status: typing.Literal["ok", "critical"]) -> N

def get_status(self, run_id: str) -> typing.Literal["ok", "critical"]:
"""Retrieve the status of this alert for a given run"""
_offline_run: bool = run_id.startswith("offline")

if not self._offline and run_id.startswith("offline"):
raise ValueError(
f"Cannot retrieve status of online alert '{self.id}' for offline run '{run_id}'"
Expand Down
4 changes: 2 additions & 2 deletions simvue/api/objects/alert/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def new(
notification: typing.Literal["none", "email"],
pattern: str,
frequency: pydantic.PositiveInt,
server_url: str | None = None,
server_token: pydantic.SecretStr | None = None,
enabled: bool = True,
offline: bool = False,
server_url: str | None = None,
server_token: pydantic.SecretStr | None = None,
**_,
) -> Self:
"""Create a new event-based alert
Expand Down
2 changes: 1 addition & 1 deletion simvue/api/objects/alert/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ def new(
enabled=enabled,
server_url=server_url,
server_token=server_token,
_params={"deduplicate": True},
_read_only=False,
_offline=offline,
)
_alert._params = {"deduplicate": True}
return _alert

@override
Expand Down
7 changes: 4 additions & 3 deletions simvue/api/objects/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def __init__(
*,
server_url: str | None,
server_token: pydantic.SecretStr | None,
_params: dict[str, str | bool] | None = None,
_read_only: bool = True,
_local: bool = False,
_user_agent: str | None = None,
Expand Down Expand Up @@ -231,7 +232,7 @@ def __init__(
self._user_config.headers if not self._offline else {}
)

self._params: dict[str, str | bool] = {}
self._params: dict[str, str | bool] | None = _params

self._staging: dict[str, typing.Any] = {}

Expand Down Expand Up @@ -655,7 +656,7 @@ def _post_batch(
_response = sv_post(
url=f"{self._base_url}",
headers=self._headers | {"Content-Type": "application/msgpack"},
params=self._params,
params=self._params or {},
data=batch_data,
is_json=True,
)
Expand Down Expand Up @@ -694,7 +695,7 @@ def _post_single(
_response = sv_post(
url=f"{self._base_url}",
headers=self._headers | {"Content-Type": "application/msgpack"},
params=self._params,
params=self._params or {},
data=data or kwargs,
is_json=is_json,
)
Expand Down
8 changes: 6 additions & 2 deletions simvue/api/objects/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,8 +795,12 @@ def on_reconnect(self, id_mapping: dict[str, str]) -> None:
id_mapping: dict[str, str]
A mapping from offline identifier to online identifier.
"""
online_alert_ids: list[str] = list(
set(id_mapping.get(_id) for _id in self._staging.get("alerts", []))
online_alert_ids: list[str | None] = list(
set(
id_mapping.get(_id)
for _id in self._staging.get("alerts", [])
if _id.startswith("offline")
)
)
if not all(online_alert_ids):
raise KeyError("Could not find alert ID in offline to online ID mapping.")
Expand Down
12 changes: 6 additions & 6 deletions simvue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,10 @@ def _update_alerts(self) -> None:
server_url=self._runner._user_config.server.url,
server_token=self._runner._user_config.server.token,
)
_is_set = _alert.get_status(run_id=self._runner.id)
_is_set: bool = False

if self._runner.mode == "online":
_is_set = _alert.get_status(run_id=self._runner.id) is not None

if process.returncode != 0:
# If the process fails then purge the dispatcher event queue
Expand All @@ -431,11 +434,8 @@ def _update_alerts(self) -> None:
self._runner.log_alert(
identifier=self._alert_ids[proc_id], state="critical"
)
else:
if not _is_set:
self._runner.log_alert(
identifier=self._alert_ids[proc_id], state="ok"
)
elif self._runner.mode == "online" and not _is_set:
self._runner.log_alert(identifier=self._alert_ids[proc_id], state="ok")

_current_time: float = 0
while (
Expand Down
55 changes: 27 additions & 28 deletions simvue/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,7 @@ def __exit__(
) -> None:
logger.debug(
"Automatically closing run '%s' in status %s",
self.id
if self._user_config.run.mode == "online" and self._sv_obj
else "unregistered",
self.id if self.mode == "online" and self._sv_obj else "unregistered",
self._status,
)

Expand All @@ -301,6 +299,11 @@ def duration(self) -> float:
"""Return current run duration"""
return time.time() - self._start_time

@property
def mode(self) -> typing.Literal["offline", "online", "disabled"]:
"""Return whether this run is offline."""
return self._user_config.run.mode

@property
def processes(self) -> list[psutil.Process]:
"""Create an array containing a list of processes"""
Expand Down Expand Up @@ -500,7 +503,7 @@ def _dispatch_callback(
if category == "events":
_events = Events.new(
run=self.id,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
events=buffer,
Expand All @@ -512,13 +515,13 @@ def _dispatch_callback(
data=buffer,
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
)
return _grid_metrics.commit()
else:
_metrics = Metrics.new(
run=self.id,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
metrics=buffer,
Expand Down Expand Up @@ -550,7 +553,7 @@ def _start(self) -> bool:
if self._sv_obj.status != "running":
self._sv_obj.status = self._status
_changed = True
if self._user_config.run.mode == "offline":
if self.mode == "offline":
self._sv_obj.started = self._start_time
_changed = True
if _changed:
Expand Down Expand Up @@ -721,7 +724,7 @@ def init(

self._folder = Folder.new(
path=folder,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
)
Expand All @@ -742,7 +745,7 @@ def init(
if name and not re.match(r"^[a-zA-Z0-9\-\_\s\/\.:]+$", name):
self._error("specified name is invalid")
return False
elif not name and self._user_config.run.mode == "offline":
elif not name and self.mode == "offline":
name = randomname.get_name()

self._status = "running" if running else "created"
Expand Down Expand Up @@ -1219,7 +1222,7 @@ def config(
"Emissions metrics require resource metrics collection - make sure resource metrics are enabled!"
)
return False
if self._user_config.run.mode == "offline":
if self.mode == "offline":
# Create an emissions monitor with no API calls
self._emissions_monitor = CO2Monitor(
intensity_refresh_interval=None,
Expand Down Expand Up @@ -1627,7 +1630,7 @@ def assign_metric_to_grid(
name=grid_name,
grid=axes_ticks,
labels=axes_labels,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
)
Expand All @@ -1650,7 +1653,7 @@ def assign_metric_to_grid(
try:
_grid_attach = Grid(
identifier=self._grids[grid_name]["id"],
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
)
Expand Down Expand Up @@ -1859,7 +1862,7 @@ def save_object(
allow_pickling=allow_pickle,
storage=self._storage_id,
metadata=metadata,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
)
Expand Down Expand Up @@ -1931,7 +1934,7 @@ def save_file(
name=name or stored_file_name,
storage=self._storage_id,
file_path=file_path,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
mime_type=file_type,
metadata=metadata,
snapshot=snapshot,
Expand Down Expand Up @@ -2096,11 +2099,7 @@ def _tidy_run(self) -> None:
self._heartbeat_termination_trigger.set()
self._heartbeat_thread.join()

if (
self._sv_obj
and self._user_config.run.mode == "offline"
and self._status != "created"
):
if self._sv_obj and self.mode == "offline" and self._status != "created":
self._user_config.offline.cache.joinpath(
"runs", f"{self.id}.closed"
).touch()
Expand Down Expand Up @@ -2231,14 +2230,14 @@ def add_alerts(
names = names or []

if names and not ids:
if self._user_config.run.mode == "offline":
if self.mode == "offline":
self._error(
"Cannot retrieve alerts based on names in offline mode - please use IDs instead."
)
return False
try:
if alerts := Alert.get(
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
):
Expand All @@ -2264,7 +2263,7 @@ def _check_if_alert_exists(self, alert: "AlertBase") -> str | None:
"""Check if an existing alert matches definition."""
# If the alert already exists just add the existing one
for _id, _existing_alert in Alert.get(
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
):
Expand Down Expand Up @@ -2348,7 +2347,7 @@ def create_metric_range_alert(
range_low=range_low,
range_high=range_high,
frequency=frequency or 60,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
)
Expand Down Expand Up @@ -2438,7 +2437,7 @@ def create_metric_threshold_alert(
frequency=frequency,
aggregation=aggregation,
notification=notification,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
)
Expand Down Expand Up @@ -2501,7 +2500,7 @@ def create_event_alert(
pattern=pattern,
notification=notification,
frequency=frequency,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
)
Expand Down Expand Up @@ -2561,7 +2560,7 @@ def create_user_alert(
name=name,
notification=notification,
description=description,
offline=self._user_config.run.mode == "offline",
offline=self.mode == "offline",
server_url=self._user_config.server.url,
server_token=self._user_config.server.token,
)
Expand Down Expand Up @@ -2613,15 +2612,15 @@ def log_alert(
self._error("Please specify alert to update either by ID or by name.")
return False

if name and self._user_config.run.mode == "offline":
if name and self.mode == "offline":
self._error(
"Cannot retrieve alerts based on names in offline mode - please use IDs instead."
)
return False

if name:
try:
if alerts := Alert.get(offline=self._user_config.run.mode == "offline"):
if alerts := Alert.get(offline=self.mode == "offline"):
identifier = next(
(id for id, alert in alerts if alert.name == name), None
)
Expand Down
4 changes: 4 additions & 0 deletions simvue/sender/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,10 @@ def initialise_object(cls, online_id: ObjectID | None, **data) -> AlertType:
if not online_id:
_source: str = data["source"]

# We need to make sure the ID of an existing alert is returned
# the server will return 409 with an ID if this is the case
data["allow_duplicates"] = False

if _source == "events":
return EventsAlert.new(**data)
elif _source == "metrics" and data.get("threshold"):
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_run_execute_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_monitor_processes(create_plain_run_offline: tuple[Run, dict]):
_run.add_process(f"process_2_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="Get-ChildItem", executable="powershell")
_run.add_process(f"process_3_{os.environ.get('PYTEST_XDIST_WORKER', 0)}", Command="exit 0", executable="powershell")
_sender = Sender(_run._sv_obj._local_staging_file.parents[1], 1, 10, throw_exceptions=True)
_sender.upload(["folders", "runs", "alerts"], )
_sender.upload(["folders", "alerts", "runs"], )


@pytest.mark.executor
Expand Down
Loading