From 7b563a4ee188eb8c23f9f872f20690cc7a453640 Mon Sep 17 00:00:00 2001 From: Somanath Date: Fri, 22 May 2026 15:46:45 +0530 Subject: [PATCH] feat(python): remove hardcoded credentials and add unit tests - Clear all hardcoded IPs, passwords, SVM/volume/aggregate names from INPUTS/ENV blocks in all 9 Python scripts; values must now be supplied via environment variables or the INPUTS dict at runtime - Add Unit_tests/ suite: 142 tests covering all 9 scripts + OntapClient - Add pytest.ini and requirements-dev.txt for test tooling - Refactor ontap_client: _DEFAULT_TIMEOUT=90, poll_job, wait_snapmirrored, load_env_file, update_auth, __all__ - Reduce cyclomatic complexity across all scripts (all functions A/B grade) - Fix non-ASCII characters in comments (U+2026, U+2192 replaced) - Add type hints and docstrings to all public functions - ruff: 0 warnings; pytest: 142 passed --- Unit_tests/__init__.py | 4 + Unit_tests/conftest.py | 12 + Unit_tests/test_cifs_provision.py | 180 ++++++ Unit_tests/test_cluster_info.py | 65 +++ Unit_tests/test_cluster_setup_basic.py | 186 ++++++ Unit_tests/test_nfs_provision.py | 74 +++ Unit_tests/test_ontap_client.py | 532 ++++++++++++++++++ .../test_snapmirror_cleanup_test_failover.py | 109 ++++ .../test_snapmirror_provision_dest_managed.py | 92 +++ .../test_snapmirror_provision_src_managed.py | 36 ++ Unit_tests/test_snapmirror_test_failover.py | 91 +++ pytest.ini | 3 + python/README.md | 38 +- python/cifs_provision.py | 49 +- python/cluster_info.py | 2 +- python/cluster_setup_basic.py | 88 +-- python/nfs_provision.py | 41 +- python/ontap_client.py | 97 +++- python/snapmirror_cleanup_test_failover.py | 47 +- python/snapmirror_provision_dest_managed.py | 71 +-- python/snapmirror_provision_src_managed.py | 424 ++++++++------ python/snapmirror_test_failover.py | 70 +-- requirements-dev.txt | 2 + 23 files changed, 1898 insertions(+), 415 deletions(-) create mode 100644 Unit_tests/__init__.py create mode 100644 Unit_tests/conftest.py create mode 100644 Unit_tests/test_cifs_provision.py create mode 100644 Unit_tests/test_cluster_info.py create mode 100644 Unit_tests/test_cluster_setup_basic.py create mode 100644 Unit_tests/test_nfs_provision.py create mode 100644 Unit_tests/test_ontap_client.py create mode 100644 Unit_tests/test_snapmirror_cleanup_test_failover.py create mode 100644 Unit_tests/test_snapmirror_provision_dest_managed.py create mode 100644 Unit_tests/test_snapmirror_provision_src_managed.py create mode 100644 Unit_tests/test_snapmirror_test_failover.py create mode 100644 pytest.ini create mode 100644 requirements-dev.txt diff --git a/Unit_tests/__init__.py b/Unit_tests/__init__.py new file mode 100644 index 0000000..a875d57 --- /dev/null +++ b/Unit_tests/__init__.py @@ -0,0 +1,4 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +# See the NOTICE file in the repo root for trademark and attribution details. diff --git a/Unit_tests/conftest.py b/Unit_tests/conftest.py new file mode 100644 index 0000000..eab4879 --- /dev/null +++ b/Unit_tests/conftest.py @@ -0,0 +1,12 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +"""Pytest configuration — make the python/ directory importable without installing.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +# Add python/ to sys.path so test modules can import ontap_client, nfs_provision, etc. +sys.path.insert(0, str(Path(__file__).parent.parent / "python")) diff --git a/Unit_tests/test_cifs_provision.py b/Unit_tests/test_cifs_provision.py new file mode 100644 index 0000000..82e48e1 --- /dev/null +++ b/Unit_tests/test_cifs_provision.py @@ -0,0 +1,180 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +"""Unit tests for cifs_provision helper functions.""" + +from __future__ import annotations + +import os +from pathlib import Path +from unittest.mock import MagicMock + +import cifs_provision +import pytest +from ontap_client import OntapClient, load_env_file + + +class TestLoadEnvFile: + def test_valid_file_sets_env_vars( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "test.env" + env_file.write_text("CIFS_FOO=bar\nCIFS_BAZ=qux\n") + monkeypatch.delenv("CIFS_FOO", raising=False) + monkeypatch.delenv("CIFS_BAZ", raising=False) + load_env_file(str(env_file)) + assert os.environ["CIFS_FOO"] == "bar" + assert os.environ["CIFS_BAZ"] == "qux" + + def test_missing_file_exits(self, tmp_path: Path) -> None: + with pytest.raises(SystemExit): + load_env_file(str(tmp_path / "nonexistent.env")) + + def test_malformed_line_exits(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + env_file = tmp_path / "bad.env" + env_file.write_text("CIFS_OK=yes\nNO_EQUALS\n") + monkeypatch.delenv("CIFS_OK", raising=False) + with pytest.raises(SystemExit): + load_env_file(str(env_file)) + + def test_blank_and_comment_lines_skipped( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "mixed.env" + env_file.write_text("# comment\n\nCIFS_KEY=value\n") + monkeypatch.delenv("CIFS_KEY", raising=False) + load_env_file(str(env_file)) + assert os.environ["CIFS_KEY"] == "value" + + def test_setdefault_does_not_override_existing( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "override.env" + env_file.write_text("CIFS_KEY2=from_file\n") + monkeypatch.setenv("CIFS_KEY2", "already_set") + load_env_file(str(env_file)) + assert os.environ["CIFS_KEY2"] == "already_set" + + +class TestPick: + def test_cli_val_wins(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("SVM_NAME", "from_env") + monkeypatch.setitem(cifs_provision.ENV, "SVM_NAME", "from_env_dict") + assert cifs_provision._pick("from_cli", "SVM_NAME") == "from_cli" + + def test_env_var_second_priority(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("SVM_NAME", "from_env") + monkeypatch.setitem(cifs_provision.ENV, "SVM_NAME", "from_env_dict") + assert cifs_provision._pick(None, "SVM_NAME") == "from_env" + + def test_env_dict_third_priority(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("SVM_NAME", raising=False) + monkeypatch.setitem(cifs_provision.ENV, "SVM_NAME", "from_env_dict") + assert cifs_provision._pick(None, "SVM_NAME") == "from_env_dict" + + def test_falls_back_to_default(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("MISSING_KEY", raising=False) + assert cifs_provision._pick(None, "MISSING_KEY", "fallback") == "fallback" + + def test_empty_string_cli_treated_as_falsy(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("SVM_NAME", "from_env") + assert cifs_provision._pick("", "SVM_NAME") == "from_env" + + +class TestResolveConfig: + def _make_args(self, **overrides): + import argparse + + defaults = { + "env_file": None, + "svm": None, + "volume": None, + "size": None, + "aggregate": "aggr1", + "share_name": None, + "share_comment": None, + "acl_user": None, + "acl_permission": None, + "create_cifs_server": False, + "cifs_server_name": None, + "workgroup": None, + } + defaults.update(overrides) + return argparse.Namespace(**defaults) + + def test_missing_aggregate_exits(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("AGGR_NAME", raising=False) + monkeypatch.setitem(cifs_provision.ENV, "AGGR_NAME", "") + args = self._make_args(aggregate=None) + with pytest.raises(SystemExit): + cifs_provision._resolve_config(args) + + def test_aggregate_from_cli(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("AGGR_NAME", raising=False) + args = self._make_args(aggregate="aggr_from_cli") + cfg, _ = cifs_provision._resolve_config(args) + assert cfg["aggregate"] == "aggr_from_cli" + + def test_create_cifs_server_flag_passed_through(self, monkeypatch: pytest.MonkeyPatch) -> None: + args = self._make_args(create_cifs_server=True) + _, create_cifs_server = cifs_provision._resolve_config(args) + assert create_cifs_server is True + + def test_returns_all_expected_keys(self, monkeypatch: pytest.MonkeyPatch) -> None: + args = self._make_args() + cfg, create_cifs_server = cifs_provision._resolve_config(args) + expected_keys = { + "svm", + "volume", + "size", + "aggregate", + "share_name", + "share_comment", + "acl_user", + "acl_permission", + "cifs_server_name", + "workgroup", + } + assert expected_keys == set(cfg.keys()) + assert isinstance(create_cifs_server, bool) + + +class TestEnsureCifsServer: + def _make_client(self) -> MagicMock: + client = MagicMock(spec=OntapClient) + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + return client + + def test_server_exists_no_create_called(self) -> None: + client = self._make_client() + client.get.return_value = { + "records": [{"svm": {"name": "vs1"}, "enabled": True}], + "num_records": 1, + } + cifs_provision._ensure_cifs_server(client, "vs1", False, "ONTAP-CIFS", "WORKGROUP") + client.post.assert_not_called() + + def test_no_server_no_flag_exits(self) -> None: + client = self._make_client() + client.get.return_value = {"records": [], "num_records": 0} + with pytest.raises(SystemExit): + cifs_provision._ensure_cifs_server(client, "vs1", False, "ONTAP-CIFS", "WORKGROUP") + + def test_no_server_with_flag_creates_server(self) -> None: + client = self._make_client() + client.get.return_value = {"records": [], "num_records": 0} + client.post.return_value = {} + cifs_provision._ensure_cifs_server(client, "vs1", True, "MY-CIFS", "MYGROUP") + client.post.assert_called_once() + call_args, call_kwargs = client.post.call_args + call_body = call_args[1] if len(call_args) > 1 else call_kwargs.get("body", {}) + assert call_body["name"] == "MY-CIFS" + assert call_body["workgroup"] == "MYGROUP" + + def test_no_server_with_flag_polls_job_when_returned(self) -> None: + client = self._make_client() + client.get.return_value = {"records": [], "num_records": 0} + client.post.return_value = {"job": {"uuid": "job-uuid-1"}} + cifs_provision._ensure_cifs_server(client, "vs1", True, "MY-CIFS", "MYGROUP") + client.poll_job.assert_called_once_with("job-uuid-1") diff --git a/Unit_tests/test_cluster_info.py b/Unit_tests/test_cluster_info.py new file mode 100644 index 0000000..718d0c1 --- /dev/null +++ b/Unit_tests/test_cluster_info.py @@ -0,0 +1,65 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +"""Unit tests for cluster_info.main().""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import cluster_info +from ontap_client import OntapClient + + +class TestClusterInfoMain: + def _make_client(self) -> MagicMock: + client = MagicMock(spec=OntapClient) + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + return client + + def test_fetches_cluster_endpoint(self) -> None: + client = self._make_client() + client.get.side_effect = [ + {"name": "cluster1", "version": {"full": "9.14.1"}}, + {"records": [], "num_records": 0}, + ] + with patch.object(OntapClient, "from_env", return_value=client): + cluster_info.main() + first_call = client.get.call_args_list[0] + assert first_call[0][0] == "/cluster" + + def test_fetches_nodes_endpoint(self) -> None: + client = self._make_client() + client.get.side_effect = [ + {"name": "cluster1", "version": {"full": "9.14.1"}}, + {"records": [], "num_records": 0}, + ] + with patch.object(OntapClient, "from_env", return_value=client): + cluster_info.main() + second_call = client.get.call_args_list[1] + assert second_call[0][0] == "/cluster/nodes" + + def test_handles_node_records(self) -> None: + client = self._make_client() + client.get.side_effect = [ + {"name": "cluster1", "version": {"full": "9.14.1"}}, + { + "records": [ + {"name": "node1", "serial_number": "SN-001"}, + {"name": "node2", "serial_number": "SN-002"}, + ], + "num_records": 2, + }, + ] + with patch.object(OntapClient, "from_env", return_value=client): + cluster_info.main() + + def test_handles_missing_cluster_fields_gracefully(self) -> None: + client = self._make_client() + client.get.side_effect = [ + {}, + {"records": [], "num_records": 0}, + ] + with patch.object(OntapClient, "from_env", return_value=client): + cluster_info.main() diff --git a/Unit_tests/test_cluster_setup_basic.py b/Unit_tests/test_cluster_setup_basic.py new file mode 100644 index 0000000..7d4b467 --- /dev/null +++ b/Unit_tests/test_cluster_setup_basic.py @@ -0,0 +1,186 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +"""Unit tests for cluster_setup_basic helper functions.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import cluster_setup_basic as csb +import pytest +from ontap_client import OntapClient + + +class TestEnv: + def test_reads_from_inputs_dict(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(csb.INPUTS, "CLUSTER_NAME", "mycluster") + assert csb._env("CLUSTER_NAME") == "mycluster" + + def test_falls_back_to_os_environ(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(csb.INPUTS, "CLUSTER_NAME", "") + monkeypatch.setenv("CLUSTER_NAME", "envcluster") + assert csb._env("CLUSTER_NAME") == "envcluster" + + def test_inputs_takes_priority_over_env(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(csb.INPUTS, "CLUSTER_NAME", "from_inputs") + monkeypatch.setenv("CLUSTER_NAME", "from_env") + assert csb._env("CLUSTER_NAME") == "from_inputs" + + def test_missing_required_key_exits(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(csb.INPUTS, "CLUSTER_NAME", "") + monkeypatch.delenv("CLUSTER_NAME", raising=False) + with pytest.raises(SystemExit): + csb._env("CLUSTER_NAME", required=True) + + def test_missing_optional_key_returns_empty(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(csb.INPUTS, "CLUSTER_NAME", "") + monkeypatch.delenv("CLUSTER_NAME", raising=False) + result = csb._env("CLUSTER_NAME", required=False) + assert result == "" + + +class TestLoadEnvFileCSB: + def test_valid_file_updates_inputs( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "build.env" + env_file.write_text("CLUSTER_NAME=testcluster\nCLUSTER_PASS=secret\n") + monkeypatch.delenv("CLUSTER_NAME", raising=False) + monkeypatch.delenv("CLUSTER_PASS", raising=False) + original = dict(csb.INPUTS) + csb._load_env_file(str(env_file)) + assert csb.INPUTS["CLUSTER_NAME"] == "testcluster" + assert csb.INPUTS["CLUSTER_PASS"] == "secret" + csb.INPUTS.update(original) + + def test_strips_double_quotes_from_values( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "quoted.env" + env_file.write_text('CLUSTER_NAME="quoted-name"\n') + monkeypatch.delenv("CLUSTER_NAME", raising=False) + csb._load_env_file(str(env_file)) + assert csb.INPUTS["CLUSTER_NAME"] == "quoted-name" + + def test_strips_single_quotes_from_values( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "single.env" + env_file.write_text("CLUSTER_NAME='single-name'\n") + monkeypatch.delenv("CLUSTER_NAME", raising=False) + csb._load_env_file(str(env_file)) + assert csb.INPUTS["CLUSTER_NAME"] == "single-name" + + def test_blank_lines_skipped(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + env_file = tmp_path / "blanks.env" + env_file.write_text("\n\nCLUSTER_NAME=ok\n\n") + monkeypatch.delenv("CLUSTER_NAME", raising=False) + csb._load_env_file(str(env_file)) + assert csb.INPUTS["CLUSTER_NAME"] == "ok" + + def test_comment_lines_skipped(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + env_file = tmp_path / "comments.env" + env_file.write_text("# a comment\nCLUSTER_NAME=fromcomment\n") + monkeypatch.delenv("CLUSTER_NAME", raising=False) + csb._load_env_file(str(env_file)) + assert csb.INPUTS["CLUSTER_NAME"] == "fromcomment" + + def test_malformed_line_without_equals_exits( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "noeq.env" + env_file.write_text("CLUSTER_NAME=safe\nNO_EQUALS\n") + monkeypatch.delenv("CLUSTER_NAME", raising=False) + with pytest.raises(SystemExit): + csb._load_env_file(str(env_file)) + + +class TestGetNodes: + def test_returns_result_on_success(self) -> None: + client = MagicMock(spec=OntapClient) + expected = {"records": [{"name": "node1"}], "num_records": 1} + client.get.return_value = expected + result = csb._get_nodes(client, membership="available") + assert result == expected + + def test_uses_node_fields_constant(self) -> None: + client = MagicMock(spec=OntapClient) + client.get.return_value = {"records": [], "num_records": 0} + csb._get_nodes(client) + call_kwargs = client.get.call_args[1] + assert call_kwargs["fields"] == csb._NODE_FIELDS + + def test_raises_on_api_error(self) -> None: + client = MagicMock(spec=OntapClient) + client.get.side_effect = RuntimeError("api error") + with pytest.raises(RuntimeError, match="api error"): + csb._get_nodes(client) + assert client.get.call_count == 1 + + +class TestDiscoverNodes: + def test_returns_node_list_on_success(self) -> None: + client = MagicMock(spec=OntapClient) + expected = {"records": [{"name": "node1"}], "num_records": 1} + with patch.object(csb, "_get_nodes", return_value=expected): + result = csb.discover_nodes(client) + assert result == expected + + def test_retries_then_succeeds(self) -> None: + client = MagicMock(spec=OntapClient) + success = {"records": [{"name": "node1"}], "num_records": 1} + with ( + patch.object(csb, "_get_nodes", side_effect=[RuntimeError("transient"), success]), + patch("cluster_setup_basic.time.sleep"), + ): + result = csb.discover_nodes(client, attempts=3, delay=0) + assert result == success + + def test_raises_runtime_error_after_all_attempts(self) -> None: + client = MagicMock(spec=OntapClient) + with ( + patch.object(csb, "_get_nodes", side_effect=RuntimeError("persistent")), + patch("cluster_setup_basic.time.sleep"), + ): + with pytest.raises(RuntimeError, match="failed after 2 attempts"): + csb.discover_nodes(client, attempts=2, delay=0) + + +class TestDiscoverLocal: + def test_returns_result_when_records_present(self) -> None: + client = MagicMock(spec=OntapClient) + expected = {"records": [{"name": "node1", "uuid": "uuid-1"}], "num_records": 1} + with patch.object(csb, "_get_nodes", return_value=expected): + result = csb.discover_local(client) + assert result == expected + + def test_raises_when_no_records(self) -> None: + client = MagicMock(spec=OntapClient) + with patch.object(csb, "_get_nodes", return_value={"records": [], "num_records": 0}): + with pytest.raises(RuntimeError, match="no local node"): + csb.discover_local(client) + + +class TestDiscoverPartner: + def test_returns_result_when_records_present(self) -> None: + client = MagicMock(spec=OntapClient) + expected = {"records": [{"name": "node2", "uuid": "uuid-2"}], "num_records": 1} + with patch.object(csb, "_get_nodes", return_value=expected): + result = csb.discover_partner(client, local_uuid="uuid-1") + assert result == expected + + def test_passes_exclusion_filter(self) -> None: + client = MagicMock(spec=OntapClient) + expected = {"records": [{"name": "node2"}], "num_records": 1} + with patch.object(csb, "_get_nodes", return_value=expected) as mock_get: + csb.discover_partner(client, local_uuid="uuid-1") + call_kwargs = mock_get.call_args[1] + assert call_kwargs.get("uuid") == "!uuid-1" + + def test_raises_when_no_records(self) -> None: + client = MagicMock(spec=OntapClient) + with patch.object(csb, "_get_nodes", return_value={"records": [], "num_records": 0}): + with pytest.raises(RuntimeError, match="no partner node"): + csb.discover_partner(client, local_uuid="uuid-1") diff --git a/Unit_tests/test_nfs_provision.py b/Unit_tests/test_nfs_provision.py new file mode 100644 index 0000000..7645f0e --- /dev/null +++ b/Unit_tests/test_nfs_provision.py @@ -0,0 +1,74 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +"""Unit tests for nfs_provision helper functions.""" + +from __future__ import annotations + +import os +from pathlib import Path + +import pytest +from ontap_client import load_env_file + + +class TestLoadEnvFile: + def test_valid_file_sets_env_vars( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "test.env" + env_file.write_text("FOO_NFS=bar\nBAZ_NFS=qux\n") + monkeypatch.delenv("FOO_NFS", raising=False) + monkeypatch.delenv("BAZ_NFS", raising=False) + load_env_file(str(env_file)) + assert os.environ["FOO_NFS"] == "bar" + assert os.environ["BAZ_NFS"] == "qux" + + def test_missing_file_exits(self, tmp_path: Path) -> None: + with pytest.raises(SystemExit): + load_env_file(str(tmp_path / "nonexistent.env")) + + def test_malformed_line_no_equals_exits( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "bad.env" + env_file.write_text("VALID=ok\nNO_EQUALS_HERE\n") + monkeypatch.delenv("VALID", raising=False) + with pytest.raises(SystemExit): + load_env_file(str(env_file)) + + def test_blank_lines_are_skipped( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "blanks.env" + env_file.write_text("\n\nKEY_NFS2=value\n\n") + monkeypatch.delenv("KEY_NFS2", raising=False) + load_env_file(str(env_file)) + assert os.environ["KEY_NFS2"] == "value" + + def test_comment_lines_are_skipped( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "comments.env" + env_file.write_text("# this is a comment\nKEY_NFS3=value\n") + monkeypatch.delenv("KEY_NFS3", raising=False) + load_env_file(str(env_file)) + assert os.environ["KEY_NFS3"] == "value" + + def test_setdefault_does_not_override_existing( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "override.env" + env_file.write_text("KEY_NFS4=from_file\n") + monkeypatch.setenv("KEY_NFS4", "already_set") + load_env_file(str(env_file)) + assert os.environ["KEY_NFS4"] == "already_set" + + def test_value_with_equals_sign_handled( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + env_file = tmp_path / "equals.env" + env_file.write_text("KEY_NFS5=a=b=c\n") + monkeypatch.delenv("KEY_NFS5", raising=False) + load_env_file(str(env_file)) + assert os.environ["KEY_NFS5"] == "a=b=c" diff --git a/Unit_tests/test_ontap_client.py b/Unit_tests/test_ontap_client.py new file mode 100644 index 0000000..dcf430b --- /dev/null +++ b/Unit_tests/test_ontap_client.py @@ -0,0 +1,532 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +"""Unit tests for ontap_client.OntapClient.""" + +from __future__ import annotations + +from collections.abc import Generator +from unittest.mock import MagicMock, patch + +import pytest +import requests +from ontap_client import OntapApiError, OntapClient + +# --------------------------------------------------------------------------- +# OntapApiError +# --------------------------------------------------------------------------- + + +class TestOntapApiError: + def test_json_detail_stored(self) -> None: + resp = MagicMock(spec=requests.Response) + resp.status_code = 400 + resp.json.return_value = {"message": "Bad request", "code": "123"} + err = OntapApiError(resp) + assert err.status_code == 400 + assert err.detail == {"message": "Bad request", "code": "123"} + assert "400" in str(err) + + def test_text_fallback_when_not_json(self) -> None: + resp = MagicMock(spec=requests.Response) + resp.status_code = 500 + resp.json.side_effect = ValueError("no JSON") + resp.text = "Internal Server Error" + err = OntapApiError(resp) + assert err.detail == "Internal Server Error" + assert "500" in str(err) + + def test_is_exception_subclass(self) -> None: + resp = MagicMock(spec=requests.Response) + resp.status_code = 404 + resp.json.return_value = {} + assert isinstance(OntapApiError(resp), Exception) + + +# --------------------------------------------------------------------------- +# OntapClient.__init__ +# --------------------------------------------------------------------------- + + +class TestOntapClientInit: + def test_base_url_formed_correctly(self) -> None: + client = OntapClient("10.0.0.1", "admin", "pass") + assert client.base_url == "https://10.0.0.1/api" + client.close() + + def test_session_auth_set(self) -> None: + client = OntapClient("10.0.0.1", "admin", "secret") + assert client._session.auth == ("admin", "secret") + client.close() + + def test_verify_ssl_defaults_false(self) -> None: + client = OntapClient("10.0.0.1", "admin", "pass") + assert client._session.verify is False + client.close() + + def test_verify_ssl_can_be_enabled(self) -> None: + client = OntapClient("10.0.0.1", "admin", "pass", verify_ssl=True) + assert client._session.verify is True + client.close() + + def test_default_timeout_stored(self) -> None: + client = OntapClient("10.0.0.1", "admin", "pass") + assert client.timeout == 90 + client.close() + + def test_custom_timeout_stored(self) -> None: + client = OntapClient("10.0.0.1", "admin", "pass", timeout=60) + assert client.timeout == 60 + client.close() + + def test_default_headers_include_accept(self) -> None: + client = OntapClient("10.0.0.1", "admin", "pass") + assert "application/hal+json" in client._session.headers.get("Accept", "") + client.close() + + def test_default_headers_include_content_type(self) -> None: + client = OntapClient("10.0.0.1", "admin", "pass") + assert client._session.headers.get("Content-Type") == "application/json" + client.close() + + +# --------------------------------------------------------------------------- +# OntapClient.from_env +# --------------------------------------------------------------------------- + + +class TestFromEnv: + def test_missing_ontap_host_exits(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("ONTAP_HOST", raising=False) + monkeypatch.delenv("ONTAP_PASS", raising=False) + with pytest.raises(SystemExit): + OntapClient.from_env() + + def test_missing_ontap_pass_exits(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("ONTAP_HOST", "10.0.0.1") + monkeypatch.delenv("ONTAP_PASS", raising=False) + with pytest.raises(SystemExit): + OntapClient.from_env() + + def test_builds_client_from_env(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("ONTAP_HOST", "10.0.0.1") + monkeypatch.setenv("ONTAP_PASS", "secret") + monkeypatch.setenv("ONTAP_USER", "testuser") + client = OntapClient.from_env() + assert client.base_url == "https://10.0.0.1/api" + assert client._session.auth == ("testuser", "secret") + client.close() + + def test_default_user_is_admin(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("ONTAP_HOST", "10.0.0.1") + monkeypatch.setenv("ONTAP_PASS", "secret") + monkeypatch.delenv("ONTAP_USER", raising=False) + client = OntapClient.from_env() + assert client._session.auth == ("admin", "secret") + client.close() + + def test_verify_ssl_true_when_set(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("ONTAP_HOST", "10.0.0.1") + monkeypatch.setenv("ONTAP_PASS", "secret") + monkeypatch.setenv("ONTAP_VERIFY_SSL", "true") + client = OntapClient.from_env() + assert client._session.verify is True + client.close() + + def test_verify_ssl_false_when_not_set(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("ONTAP_HOST", "10.0.0.1") + monkeypatch.setenv("ONTAP_PASS", "secret") + monkeypatch.delenv("ONTAP_VERIFY_SSL", raising=False) + client = OntapClient.from_env() + assert client._session.verify is False + client.close() + + +# --------------------------------------------------------------------------- +# OntapClient._url +# --------------------------------------------------------------------------- + + +class TestUrl: + def setup_method(self) -> None: + self.client = OntapClient("10.0.0.1", "admin", "pass") + + def teardown_method(self) -> None: + self.client.close() + + def test_absolute_path_prefixed_with_base(self) -> None: + assert self.client._url("/cluster") == "https://10.0.0.1/api/cluster" + + def test_relative_path_prefixed_with_base(self) -> None: + assert self.client._url("cluster/nodes") == "https://10.0.0.1/api/cluster/nodes" + + def test_absolute_https_url_returned_unchanged(self) -> None: + url = "https://other.host/api/cluster" + assert self.client._url(url) == url + + +# --------------------------------------------------------------------------- +# OntapClient._request +# --------------------------------------------------------------------------- + + +class TestRequest: + def setup_method(self) -> None: + self.client = OntapClient("10.0.0.1", "admin", "pass") + self.mock_resp = MagicMock(spec=requests.Response) + self.client._session.request = MagicMock(return_value=self.mock_resp) + + def teardown_method(self) -> None: + self.client.close() + + def test_success_returns_json(self) -> None: + self.mock_resp.ok = True + self.mock_resp.status_code = 200 + self.mock_resp.content = b'{"name": "cluster1"}' + self.mock_resp.json.return_value = {"name": "cluster1"} + result = self.client._request("GET", "/cluster") + assert result == {"name": "cluster1"} + + def test_204_returns_empty_dict(self) -> None: + self.mock_resp.ok = True + self.mock_resp.status_code = 204 + self.mock_resp.content = b"" + result = self.client._request("DELETE", "/some/resource") + assert result == {} + + def test_empty_content_returns_empty_dict(self) -> None: + self.mock_resp.ok = True + self.mock_resp.status_code = 200 + self.mock_resp.content = b"" + result = self.client._request("GET", "/cluster") + assert result == {} + + def test_non_ok_raises_ontap_api_error(self) -> None: + self.mock_resp.ok = False + self.mock_resp.status_code = 404 + self.mock_resp.json.return_value = {"message": "Not found"} + with pytest.raises(OntapApiError) as exc_info: + self.client._request("GET", "/missing") + assert exc_info.value.status_code == 404 + + def test_uses_default_timeout(self) -> None: + self.mock_resp.ok = True + self.mock_resp.status_code = 200 + self.mock_resp.content = b"{}" + self.mock_resp.json.return_value = {} + self.client._request("GET", "/cluster") + call_kwargs = self.client._session.request.call_args[1] + assert call_kwargs["timeout"] == self.client.timeout + + def test_url_built_from_path(self) -> None: + self.mock_resp.ok = True + self.mock_resp.status_code = 200 + self.mock_resp.content = b"{}" + self.mock_resp.json.return_value = {} + self.client._request("GET", "/cluster") + call_args = self.client._session.request.call_args[0] + assert call_args[1] == "https://10.0.0.1/api/cluster" + + +# --------------------------------------------------------------------------- +# OntapClient HTTP convenience methods +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def client_with_mock_session() -> Generator[OntapClient, None, None]: + client = OntapClient("10.0.0.1", "admin", "pass") + mock_resp = MagicMock(spec=requests.Response) + mock_resp.ok = True + mock_resp.status_code = 200 + mock_resp.content = b'{"records": []}' + mock_resp.json.return_value = {"records": []} + client._session.request = MagicMock(return_value=mock_resp) + yield client + client.close() + + +class TestHttpMethods: + def test_get_adds_fields_param(self, client_with_mock_session: OntapClient) -> None: + client_with_mock_session.get("/cluster/nodes", fields="name,serial_number") + params = client_with_mock_session._session.request.call_args[1]["params"] + assert params["fields"] == "name,serial_number" + + def test_get_no_fields_key_when_omitted(self, client_with_mock_session: OntapClient) -> None: + client_with_mock_session.get("/cluster/nodes") + params = client_with_mock_session._session.request.call_args[1]["params"] + assert "fields" not in params + + def test_get_passes_extra_params(self, client_with_mock_session: OntapClient) -> None: + client_with_mock_session.get("/cluster/nodes", membership="available") + params = client_with_mock_session._session.request.call_args[1]["params"] + assert params["membership"] == "available" + + def test_post_uses_post_method(self, client_with_mock_session: OntapClient) -> None: + client_with_mock_session.post("/cluster", {"name": "c1"}) + method = client_with_mock_session._session.request.call_args[0][0] + assert method == "POST" + + def test_post_sends_json_body(self, client_with_mock_session: OntapClient) -> None: + body = {"name": "c1", "password": "secret"} + client_with_mock_session.post("/cluster", body) + json_arg = client_with_mock_session._session.request.call_args[1]["json"] + assert json_arg == body + + def test_patch_uses_patch_method(self, client_with_mock_session: OntapClient) -> None: + client_with_mock_session.patch("/cluster/nodes/uuid1", {"state": "up"}) + method = client_with_mock_session._session.request.call_args[0][0] + assert method == "PATCH" + + def test_patch_sends_json_body(self, client_with_mock_session: OntapClient) -> None: + body = {"state": "up"} + client_with_mock_session.patch("/cluster/nodes/uuid1", body) + json_arg = client_with_mock_session._session.request.call_args[1]["json"] + assert json_arg == body + + def test_delete_uses_delete_method(self, client_with_mock_session: OntapClient) -> None: + client_with_mock_session.delete("/volumes/uuid1") + method = client_with_mock_session._session.request.call_args[0][0] + assert method == "DELETE" + + +# --------------------------------------------------------------------------- +# OntapClient.poll_job +# --------------------------------------------------------------------------- + +_JOB_UUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + + +class TestPollJob: + def setup_method(self) -> None: + self.client = OntapClient("10.0.0.1", "admin", "pass") + + def teardown_method(self) -> None: + self.client.close() + + def test_success_state_returns_job(self) -> None: + self.client.get = MagicMock(return_value={"state": "success", "message": "done"}) + result = self.client.poll_job(_JOB_UUID) + assert result["state"] == "success" + + def test_failure_state_raises_runtime_error(self) -> None: + self.client.get = MagicMock(return_value={"state": "failure", "message": "boom"}) + with pytest.raises(RuntimeError, match="failed"): + self.client.poll_job(_JOB_UUID) + + def test_failure_includes_job_message(self) -> None: + self.client.get = MagicMock(return_value={"state": "failure", "message": "disk error"}) + with pytest.raises(RuntimeError, match="disk error"): + self.client.poll_job(_JOB_UUID) + + def test_polls_until_success(self) -> None: + responses = [ + {"state": "running"}, + {"state": "running"}, + {"state": "success"}, + ] + self.client.get = MagicMock(side_effect=responses) + with ( + patch("ontap_client.time.sleep"), + patch("ontap_client.time.monotonic", return_value=0), + ): + result = self.client.poll_job(_JOB_UUID, interval=1, timeout=300) + assert result["state"] == "success" + assert self.client.get.call_count == 3 + + def test_timeout_raises_timeout_error(self) -> None: + self.client.get = MagicMock(return_value={"state": "running"}) + with ( + patch("ontap_client.time.sleep"), + patch("ontap_client.time.monotonic", side_effect=[0, 400]), + ): + with pytest.raises(TimeoutError): + self.client.poll_job(_JOB_UUID, interval=1, timeout=300) + + def test_connection_error_retries_then_succeeds(self) -> None: + self.client.get = MagicMock( + side_effect=[ + requests.exceptions.ConnectionError("disconnected"), + {"state": "success"}, + ] + ) + with ( + patch("ontap_client.time.sleep"), + patch("ontap_client.time.monotonic", return_value=0), + ): + result = self.client.poll_job(_JOB_UUID, interval=1, timeout=300) + assert result["state"] == "success" + + def test_connection_error_past_deadline_raises_timeout(self) -> None: + self.client.get = MagicMock( + side_effect=requests.exceptions.ConnectionError("disconnected") + ) + with ( + patch("ontap_client.time.sleep"), + patch("ontap_client.time.monotonic", side_effect=[0, 400]), + ): + with pytest.raises(TimeoutError): + self.client.poll_job(_JOB_UUID, interval=1, timeout=300) + + def test_polls_correct_job_url(self) -> None: + self.client.get = MagicMock(return_value={"state": "success"}) + self.client.poll_job(_JOB_UUID) + call_args = self.client.get.call_args[0][0] + assert _JOB_UUID in call_args + + +# --------------------------------------------------------------------------- +# Context manager +# --------------------------------------------------------------------------- + + +class TestContextManager: + def test_enter_returns_self(self) -> None: + client = OntapClient("10.0.0.1", "admin", "pass") + assert client.__enter__() is client + client.close() + + def test_exit_closes_session(self) -> None: + client = OntapClient("10.0.0.1", "admin", "pass") + close_mock = MagicMock() + client._session.close = close_mock + with client: + pass + close_mock.assert_called_once() + + def test_with_statement_usage(self) -> None: + with OntapClient("10.0.0.1", "admin", "pass") as client: + assert isinstance(client, OntapClient) + + +# --------------------------------------------------------------------------- +# OntapClient.update_auth +# --------------------------------------------------------------------------- + + +class TestUpdateAuth: + def test_updates_session_auth(self) -> None: + client = OntapClient("10.0.0.1", "admin", "oldpass") + client.update_auth("newuser", "newpass") + assert client._session.auth == ("newuser", "newpass") + client.close() + + def test_replaces_original_credentials(self) -> None: + client = OntapClient("10.0.0.1", "admin", "original") + assert client._session.auth == ("admin", "original") + client.update_auth("operator", "rotated") + assert client._session.auth == ("operator", "rotated") + client.close() + + +# --------------------------------------------------------------------------- +# OntapClient.wait_snapmirrored +# --------------------------------------------------------------------------- + + +class TestWaitSnapmirrored: + def setup_method(self) -> None: + self.client = OntapClient("10.0.0.1", "admin", "pass") + + def teardown_method(self) -> None: + self.client.close() + + def test_returns_immediately_when_snapmirrored(self) -> None: + self.client.get = MagicMock(return_value={"state": "snapmirrored", "healthy": True}) + result = self.client.wait_snapmirrored("rel-uuid-1", interval=1, max_wait=60) + assert result["state"] == "snapmirrored" + assert self.client.get.call_count == 1 + + def test_polls_until_snapmirrored(self) -> None: + self.client.get = MagicMock( + side_effect=[ + {"state": "transferring"}, + {"state": "transferring"}, + {"state": "snapmirrored"}, + ] + ) + with patch("ontap_client.time.sleep"): + result = self.client.wait_snapmirrored("rel-uuid-1", interval=1, max_wait=600) + assert result["state"] == "snapmirrored" + assert self.client.get.call_count == 3 + + def test_raises_on_timeout(self) -> None: + self.client.get = MagicMock(return_value={"state": "transferring"}) + with patch("ontap_client.time.sleep"): + with pytest.raises(RuntimeError, match="Timed out"): + self.client.wait_snapmirrored("rel-uuid-1", interval=2, max_wait=1) + + def test_queries_correct_relationship_url(self) -> None: + self.client.get = MagicMock(return_value={"state": "snapmirrored"}) + self.client.wait_snapmirrored("my-rel-uuid", interval=1, max_wait=60) + call_url = self.client.get.call_args[0][0] + assert "my-rel-uuid" in call_url + + +# --------------------------------------------------------------------------- +# load_env_file +# --------------------------------------------------------------------------- + + +class TestLoadEnvFile: + def test_loads_key_value_pairs( + self, tmp_path: pytest.TempdirFactory, monkeypatch: pytest.MonkeyPatch + ) -> None: + from ontap_client import load_env_file + + env_file = tmp_path / ".env" + env_file.write_text("MY_KEY=my_value\nOTHER=123\n") + monkeypatch.delenv("MY_KEY", raising=False) + monkeypatch.delenv("OTHER", raising=False) + load_env_file(str(env_file)) + import os + + assert os.environ["MY_KEY"] == "my_value" + assert os.environ["OTHER"] == "123" + + def test_strips_surrounding_quotes( + self, tmp_path: pytest.TempdirFactory, monkeypatch: pytest.MonkeyPatch + ) -> None: + from ontap_client import load_env_file + + env_file = tmp_path / ".env" + env_file.write_text("QUOTED=\"hello world\"\nSINGLE='val'\n") + monkeypatch.delenv("QUOTED", raising=False) + monkeypatch.delenv("SINGLE", raising=False) + load_env_file(str(env_file)) + import os + + assert os.environ["QUOTED"] == "hello world" + assert os.environ["SINGLE"] == "val" + + def test_existing_env_vars_not_overwritten( + self, tmp_path: pytest.TempdirFactory, monkeypatch: pytest.MonkeyPatch + ) -> None: + from ontap_client import load_env_file + + env_file = tmp_path / ".env" + env_file.write_text("PRESET_KEY=file_value\n") + monkeypatch.setenv("PRESET_KEY", "existing_value") + load_env_file(str(env_file)) + import os + + assert os.environ["PRESET_KEY"] == "existing_value" + + def test_skips_blank_lines_and_comments( + self, tmp_path: pytest.TempdirFactory, monkeypatch: pytest.MonkeyPatch + ) -> None: + from ontap_client import load_env_file + + env_file = tmp_path / ".env" + env_file.write_text("# comment\n\nVALID_KEY=yes\n") + monkeypatch.delenv("VALID_KEY", raising=False) + load_env_file(str(env_file)) + import os + + assert os.environ["VALID_KEY"] == "yes" + + def test_exits_when_file_not_found(self) -> None: + from ontap_client import load_env_file + + with pytest.raises(SystemExit): + load_env_file("/nonexistent/path/.env") diff --git a/Unit_tests/test_snapmirror_cleanup_test_failover.py b/Unit_tests/test_snapmirror_cleanup_test_failover.py new file mode 100644 index 0000000..7c81772 --- /dev/null +++ b/Unit_tests/test_snapmirror_cleanup_test_failover.py @@ -0,0 +1,109 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +"""Unit tests for shared helpers in snapmirror_cleanup_test_failover.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest +import snapmirror_cleanup_test_failover as sm_clean +from ontap_client import OntapClient + + +class TestEnv: + def test_reads_from_inputs_dict(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_clean.INPUTS, "CLUSTER_A", "10.5.0.1") + assert sm_clean._env("CLUSTER_A") == "10.5.0.1" + + def test_falls_back_to_os_environ(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_clean.INPUTS, "CLUSTER_A", "") + monkeypatch.setenv("CLUSTER_A", "10.5.0.2") + assert sm_clean._env("CLUSTER_A") == "10.5.0.2" + + def test_missing_required_key_exits(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_clean.INPUTS, "CLUSTER_A", "") + monkeypatch.delenv("CLUSTER_A", raising=False) + with pytest.raises(SystemExit): + sm_clean._env("CLUSTER_A") + + def test_returns_default_when_missing(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_clean.INPUTS, "CLUSTER_A", "") + monkeypatch.delenv("CLUSTER_A", raising=False) + assert sm_clean._env("CLUSTER_A", default="fallback") == "fallback" + + +class TestPickClusterByRelationship: + def _vol_client(self, records: list[dict]) -> MagicMock: + client = MagicMock(spec=OntapClient) + client.get.return_value = {"records": records, "num_records": len(records)} + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + return client + + def test_picks_cluster_a_when_it_has_relationship(self) -> None: + rel = { + "uuid": "rel-uuid-1", + "source": {"path": "vs0:vol_rw_01"}, + "destination": {"path": "vs1:vol_rw_01_dest"}, + "state": "snapmirrored", + "healthy": True, + } + a_client = self._vol_client([rel]) + with patch("snapmirror_cleanup_test_failover.OntapClient", return_value=a_client): + cluster, found_rel = sm_clean._pick_cluster_by_relationship( + "10.0.0.1", "10.0.0.2", "admin", "pass", "vs0", "vol_rw_01" + ) + assert cluster == "10.0.0.1" + assert found_rel["uuid"] == "rel-uuid-1" + + def test_falls_through_to_cluster_b(self) -> None: + rel = { + "uuid": "rel-uuid-2", + "source": {"path": "vs0:vol_rw_01"}, + "state": "snapmirrored", + } + a_client = self._vol_client([]) + b_client = self._vol_client([rel]) + with patch( + "snapmirror_cleanup_test_failover.OntapClient", side_effect=[a_client, b_client] + ): + cluster, found_rel = sm_clean._pick_cluster_by_relationship( + "10.0.0.1", "10.0.0.2", "admin", "pass", "vs0", "vol_rw_01" + ) + assert cluster == "10.0.0.2" + assert found_rel["uuid"] == "rel-uuid-2" + + def test_exits_when_neither_cluster_has_relationship(self) -> None: + no_rel_client = self._vol_client([]) + with patch("snapmirror_cleanup_test_failover.OntapClient", return_value=no_rel_client): + with pytest.raises(SystemExit): + sm_clean._pick_cluster_by_relationship( + "10.0.0.1", "10.0.0.2", "admin", "pass", "vs0", "vol_rw_01" + ) + + def test_skips_unreachable_cluster_and_continues(self) -> None: + rel = {"uuid": "rel-uuid-b", "source": {"path": "vs0:vol"}, "state": "snapmirrored"} + a_client = MagicMock(spec=OntapClient) + a_client.__enter__ = MagicMock(return_value=a_client) + a_client.__exit__ = MagicMock(return_value=False) + a_client.get.side_effect = ConnectionError("unreachable") + b_client = self._vol_client([rel]) + with patch( + "snapmirror_cleanup_test_failover.OntapClient", side_effect=[a_client, b_client] + ): + cluster, found_rel = sm_clean._pick_cluster_by_relationship( + "10.0.0.1", "10.0.0.2", "admin", "pass", "vs0", "vol" + ) + assert cluster == "10.0.0.2" + + def test_passes_source_path_filter(self) -> None: + rel = {"uuid": "rel-uuid-x", "source": {"path": "vs0:myvol"}, "state": "snapmirrored"} + a_client = self._vol_client([rel]) + with patch("snapmirror_cleanup_test_failover.OntapClient", return_value=a_client): + sm_clean._pick_cluster_by_relationship( + "10.0.0.1", "10.0.0.2", "admin", "pass", "vs0", "myvol" + ) + call_kwargs = a_client.get.call_args[1] + assert call_kwargs.get("source.path") == "vs0:myvol" diff --git a/Unit_tests/test_snapmirror_provision_dest_managed.py b/Unit_tests/test_snapmirror_provision_dest_managed.py new file mode 100644 index 0000000..bffdf47 --- /dev/null +++ b/Unit_tests/test_snapmirror_provision_dest_managed.py @@ -0,0 +1,92 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +"""Unit tests for shared helpers in snapmirror_provision_dest_managed.""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest +import snapmirror_provision_dest_managed as sm_dst +from ontap_client import OntapClient + + +class TestEnv: + def test_reads_from_inputs_dict(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_dst.INPUTS, "SOURCE_HOST", "10.1.0.1") + assert sm_dst._env("SOURCE_HOST") == "10.1.0.1" + + def test_falls_back_to_os_environ(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_dst.INPUTS, "SOURCE_HOST", "") + monkeypatch.setenv("SOURCE_HOST", "10.1.0.2") + assert sm_dst._env("SOURCE_HOST") == "10.1.0.2" + + def test_missing_required_key_exits(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_dst.INPUTS, "SOURCE_HOST", "") + monkeypatch.delenv("SOURCE_HOST", raising=False) + with pytest.raises(SystemExit): + sm_dst._env("SOURCE_HOST") + + def test_returns_default_when_missing(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_dst.INPUTS, "SOURCE_HOST", "") + monkeypatch.delenv("SOURCE_HOST", raising=False) + assert sm_dst._env("SOURCE_HOST", default="default_val") == "default_val" + + +class TestGetIcLifIps: + def test_returns_intercluster_ips(self) -> None: + client = MagicMock(spec=OntapClient) + client.get.return_value = { + "records": [ + {"ip": {"address": "10.0.0.10"}, "services": ["intercluster-core"]}, + {"ip": {"address": "10.0.0.11"}, "services": ["data-nfs"]}, + ] + } + ips = sm_dst._get_ic_lif_ips(client) + assert ips == ["10.0.0.10"] + + def test_returns_empty_when_no_ic_lifs(self) -> None: + client = MagicMock(spec=OntapClient) + client.get.return_value = { + "records": [{"ip": {"address": "10.0.0.11"}, "services": ["data-nfs"]}] + } + assert sm_dst._get_ic_lif_ips(client) == [] + + def test_returns_empty_on_empty_records(self) -> None: + client = MagicMock(spec=OntapClient) + client.get.return_value = {"records": []} + assert sm_dst._get_ic_lif_ips(client) == [] + + def test_skips_records_with_no_ip_address(self) -> None: + client = MagicMock(spec=OntapClient) + client.get.return_value = {"records": [{"ip": {}, "services": ["intercluster-core"]}]} + assert sm_dst._get_ic_lif_ips(client) == [] + + +class TestCheckIcLifPreconditions: + def test_exits_when_no_src_ips(self) -> None: + src = MagicMock(spec=OntapClient) + dst = MagicMock(spec=OntapClient) + with pytest.raises(SystemExit): + sm_dst._check_ic_lif_preconditions(src, dst, [], ["10.0.0.1"]) + + def test_exits_when_no_dst_ips(self) -> None: + src = MagicMock(spec=OntapClient) + dst = MagicMock(spec=OntapClient) + with pytest.raises(SystemExit): + sm_dst._check_ic_lif_preconditions(src, dst, ["10.0.0.1"], []) + + def test_no_error_when_same_subnet(self) -> None: + src = MagicMock(spec=OntapClient) + dst = MagicMock(spec=OntapClient) + sm_dst._check_ic_lif_preconditions(src, dst, ["10.0.0.1"], ["10.0.0.2"]) + + def test_warns_when_different_subnets(self, caplog: pytest.LogCaptureFixture) -> None: + import logging + + src = MagicMock(spec=OntapClient) + dst = MagicMock(spec=OntapClient) + with caplog.at_level(logging.WARNING, logger="snapmirror_provision_dest_managed"): + sm_dst._check_ic_lif_preconditions(src, dst, ["10.0.0.1"], ["192.168.1.1"]) + assert any("subnet" in msg.lower() for msg in caplog.messages) diff --git a/Unit_tests/test_snapmirror_provision_src_managed.py b/Unit_tests/test_snapmirror_provision_src_managed.py new file mode 100644 index 0000000..df874e2 --- /dev/null +++ b/Unit_tests/test_snapmirror_provision_src_managed.py @@ -0,0 +1,36 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +"""Unit tests for shared helpers in snapmirror_provision_src_managed.""" + +from __future__ import annotations + +import pytest +import snapmirror_provision_src_managed as sm_src + + +class TestEnv: + def test_reads_from_inputs_dict(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_src.INPUTS, "SOURCE_HOST", "10.0.0.1") + assert sm_src._env("SOURCE_HOST") == "10.0.0.1" + + def test_falls_back_to_os_environ(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_src.INPUTS, "SOURCE_HOST", "") + monkeypatch.setenv("SOURCE_HOST", "10.0.0.2") + assert sm_src._env("SOURCE_HOST") == "10.0.0.2" + + def test_inputs_takes_priority_over_environ(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_src.INPUTS, "SOURCE_HOST", "from_inputs") + monkeypatch.setenv("SOURCE_HOST", "from_env") + assert sm_src._env("SOURCE_HOST") == "from_inputs" + + def test_missing_required_key_exits(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_src.INPUTS, "SOURCE_HOST", "") + monkeypatch.delenv("SOURCE_HOST", raising=False) + with pytest.raises(SystemExit): + sm_src._env("SOURCE_HOST") + + def test_returns_default_when_not_required(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_src.INPUTS, "SOURCE_HOST", "") + monkeypatch.delenv("SOURCE_HOST", raising=False) + assert sm_src._env("SOURCE_HOST", default="fallback") == "fallback" diff --git a/Unit_tests/test_snapmirror_test_failover.py b/Unit_tests/test_snapmirror_test_failover.py new file mode 100644 index 0000000..e76c754 --- /dev/null +++ b/Unit_tests/test_snapmirror_test_failover.py @@ -0,0 +1,91 @@ +# © 2026 NetApp, Inc. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the NOTICE file in the repo root for trademark and attribution details. +"""Unit tests for shared helpers in snapmirror_test_failover.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest +import snapmirror_test_failover as sm_tf +from ontap_client import OntapClient + + +class TestEnv: + def test_reads_from_inputs_dict(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_tf.INPUTS, "CLUSTER_A", "10.0.1.1") + assert sm_tf._env("CLUSTER_A") == "10.0.1.1" + + def test_falls_back_to_os_environ(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_tf.INPUTS, "CLUSTER_A", "") + monkeypatch.setenv("CLUSTER_A", "10.0.1.2") + assert sm_tf._env("CLUSTER_A") == "10.0.1.2" + + def test_missing_required_key_exits(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_tf.INPUTS, "CLUSTER_A", "") + monkeypatch.delenv("CLUSTER_A", raising=False) + with pytest.raises(SystemExit): + sm_tf._env("CLUSTER_A") + + def test_returns_default_when_not_required(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sm_tf.INPUTS, "CLUSTER_A", "") + monkeypatch.delenv("CLUSTER_A", raising=False) + assert sm_tf._env("CLUSTER_A", default="x") == "x" + + +class TestPickCluster: + def _make_client(self, records: list[dict]) -> MagicMock: + client = MagicMock(spec=OntapClient) + client.get.return_value = {"records": records, "num_records": len(records)} + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + return client + + def test_picks_cluster_a_when_it_has_dp_volume(self) -> None: + vol = {"name": "vol_rw_01_dest", "uuid": "v-uuid", "svm": {"name": "vs1"}} + with patch("snapmirror_test_failover.OntapClient") as MockClient: + instance = self._make_client([vol]) + MockClient.return_value = instance + cluster, found_vol = sm_tf._pick_cluster( + "10.0.0.1", "10.0.0.2", "admin", "pass", "vol_rw_01" + ) + assert cluster == "10.0.0.1" + assert found_vol["name"] == "vol_rw_01_dest" + + def test_falls_through_to_cluster_b(self) -> None: + vol = {"name": "vol_rw_01_dest", "uuid": "v-uuid", "svm": {"name": "vs1"}} + a_client = self._make_client([]) + b_client = self._make_client([vol]) + with patch("snapmirror_test_failover.OntapClient", side_effect=[a_client, b_client]): + cluster, found_vol = sm_tf._pick_cluster( + "10.0.0.1", "10.0.0.2", "admin", "pass", "vol_rw_01" + ) + assert cluster == "10.0.0.2" + + def test_exits_when_no_cluster_has_dp_volume(self) -> None: + no_vol_client = self._make_client([]) + with patch("snapmirror_test_failover.OntapClient", return_value=no_vol_client): + with pytest.raises(SystemExit): + sm_tf._pick_cluster("10.0.0.1", "10.0.0.2", "admin", "pass", "vol_rw_01") + + def test_uses_wildcard_filter_in_auto_mode(self) -> None: + no_vol_client = self._make_client([]) + with patch("snapmirror_test_failover.OntapClient", return_value=no_vol_client): + with pytest.raises(SystemExit): + sm_tf._pick_cluster("10.0.0.1", "10.0.0.2", "admin", "pass", "*") + call_kwargs = no_vol_client.get.call_args[1] + assert call_kwargs.get("name") == "*_dest" + + def test_skips_unreachable_cluster_and_continues(self) -> None: + vol = {"name": "vol_01_dest", "uuid": "v-uuid", "svm": {"name": "vs1"}} + a_client = MagicMock(spec=OntapClient) + a_client.__enter__ = MagicMock(return_value=a_client) + a_client.__exit__ = MagicMock(return_value=False) + a_client.get.side_effect = ConnectionError("unreachable") + b_client = self._make_client([vol]) + with patch("snapmirror_test_failover.OntapClient", side_effect=[a_client, b_client]): + cluster, found_vol = sm_tf._pick_cluster( + "10.0.0.1", "10.0.0.2", "admin", "pass", "vol_01" + ) + assert cluster == "10.0.0.2" diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..d47cc87 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +testpaths = tests +addopts = -v --tb=short diff --git a/python/README.md b/python/README.md index 7146da6..e23f25e 100644 --- a/python/README.md +++ b/python/README.md @@ -10,10 +10,9 @@ async jobs), see the To compare this approach with Ansible or Terraform, see [Choosing an approach](../docs/choosing-an-approach.md). -> **Note:** These scripts are runnable illustrations, not a tested library. -> There are no unit tests by design - CI validates only lint and formatting -> via Ruff. When you adapt a script for production, add tests appropriate -> to your environment. +> **Note:** These scripts are runnable illustrations. Unit tests live in +> `Unit_tests/` and can be run with `pytest Unit_tests/`. CI validates lint +> and formatting via Ruff in addition to running the test suite. --- @@ -46,7 +45,7 @@ export ONTAP_USER=admin # default: admin export ONTAP_PASS=your_password ``` -Or use an env file: +Or use an env file and pass it to scripts that support `--env-file`: ```bash # cluster.env @@ -56,7 +55,20 @@ ONTAP_PASS=your_password ``` ```bash +# Linux / macOS set -a && source cluster.env && set +a + +# Windows PowerShell +Get-Content cluster.env | ForEach-Object { + if ($_ -match '^([^#][^=]*)=(.*)$') { [System.Environment]::SetEnvironmentVariable($Matches[1].Trim(), $Matches[2].Trim()) } +} +``` + +Scripts that accept `--env-file` (e.g. `cluster_setup_basic.py`) can also load +the file directly: + +```bash +python cluster_setup_basic.py --env-file cluster.env ``` > SSL verification is disabled by default to support environments that use @@ -100,7 +112,13 @@ All flags can also be set via environment variables (`SVM_NAME`, `VOLUME_NAME`, |---|---| | `ontap_client.py` | Reusable ONTAP REST client (session management, auth, polling, error handling) | | `cluster_info.py` | Get cluster version + node list | +| `cluster_setup_basic.py` | Create a new ONTAP cluster from two pre-cluster nodes | | `nfs_provision.py` | Create NFS volume with export policy | +| `cifs_provision.py` | Create CIFS/SMB share (optionally create CIFS server) | +| `snapmirror_provision_src_managed.py` | Provision a SnapMirror relationship from the source cluster | +| `snapmirror_provision_dest_managed.py` | Provision a SnapMirror relationship from the destination cluster | +| `snapmirror_test_failover.py` | Create a FlexClone of the SnapMirror destination for test failover | +| `snapmirror_cleanup_test_failover.py` | Delete the FlexClone created by a test failover | | `requirements.txt` | Python dependencies | ## Code Patterns @@ -109,8 +127,14 @@ These scripts demonstrate several patterns you can reuse: - **`OntapClient.from_env()`** - builds a configured client from environment variables so credentials never appear in code -- **`client.poll_job(uuid)`** - polls an async ONTAP job until completion with - configurable interval and timeout +- **`client.poll_job(uuid)`** - polls an async ONTAP job until completion; + accepts keyword args `interval` (seconds between polls, default 5) and + `timeout` (max seconds to wait, default 300); raises `RuntimeError` on + job failure and `TimeoutError` on timeout +- **`client.wait_snapmirrored(rel_uuid)`** - polls a SnapMirror relationship + until its state reaches `snapmirrored`; accepts `interval` and `max_wait` +- **`client.update_auth(username, password)`** - replaces session credentials + mid-workflow (used by `cluster_setup_basic.py` after cluster creation) - **Context manager** - `with OntapClient.from_env() as client:` ensures the HTTP session is properly closed - **Structured logging** - all output goes through `logging`, not `print()`, diff --git a/python/cifs_provision.py b/python/cifs_provision.py index 4dfe231..c497186 100644 --- a/python/cifs_provision.py +++ b/python/cifs_provision.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # © 2026 NetApp, Inc. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. @@ -27,9 +26,8 @@ import logging import os import sys -from pathlib import Path -from ontap_client import OntapApiError, OntapClient +from ontap_client import OntapApiError, OntapClient, load_env_file logging.basicConfig( level=logging.INFO, @@ -38,15 +36,15 @@ logger = logging.getLogger(__name__) ENV = { - "ONTAP_HOST": "", # cluster management IP — set here or via ONTAP_HOST env var + "ONTAP_HOST": "", # cluster management IP — set via ONTAP_HOST env var "ONTAP_USER": "admin", "ONTAP_PASS": "", # never hardcode — set via ONTAP_PASS env var - "SVM_NAME": "vs1", - "VOLUME_NAME": "vol_002", + "SVM_NAME": "", + "VOLUME_NAME": "", "VOLUME_SIZE": "100MB", "AGGR_NAME": "", # required — set via --aggregate or AGGR_NAME env var "CLIENT_MATCH": "0.0.0.0/0", # required — set via --client-match or CLIENT_MATCH env var - "SHARE_NAME": "cifs_share_demo", + "SHARE_NAME": "cifs_share", "SHARE_COMMENT": "Provisioned by orchestrio", "ACL_USER": "Everyone", "ACL_PERMISSION": "full_control", @@ -55,23 +53,6 @@ } -def _load_env_file(path: str) -> None: - """Load KEY=VALUE pairs from an env file into os.environ (dotenv style).""" - p = Path(path) - if not p.is_file(): - logger.error("Env file not found: %s", path) - sys.exit(1) - for lineno, raw in enumerate(p.read_text().splitlines(), start=1): - line = raw.strip() - if not line or line.startswith("#"): - continue - if "=" not in line: - logger.error("Env file %s line %d: expected KEY=VALUE, got: %s", path, lineno, line) - sys.exit(1) - key, _, value = line.partition("=") - os.environ.setdefault(key.strip(), value.strip()) - - def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Provision a CIFS share") p.add_argument( @@ -114,10 +95,15 @@ def _pick(cli_val: str | None, env_key: str, default: str = "") -> str: return cli_val or os.environ.get(env_key) or ENV.get(env_key, "") or default -def _resolve_config(args: argparse.Namespace) -> dict[str, str | bool]: - """Load env file and CLI args, then return the resolved configuration dict.""" +def _resolve_config(args: argparse.Namespace) -> tuple[dict[str, str], bool]: + """Load env file and CLI args, then return (str_config, create_cifs_server). + + Returns a tuple of: + - A ``dict[str, str]`` with all string config values. + - A ``bool`` indicating whether to auto-create the CIFS server. + """ if args.env_file: - _load_env_file(args.env_file) + load_env_file(args.env_file) for key, value in ENV.items(): if value and key not in os.environ: @@ -128,7 +114,7 @@ def _resolve_config(args: argparse.Namespace) -> dict[str, str | bool]: logger.error("--aggregate is required (or set AGGR_NAME in env / --env-file)") sys.exit(1) - return { + cfg: dict[str, str] = { "svm": _pick(args.svm, "SVM_NAME", "vs0"), "volume": _pick(args.volume, "VOLUME_NAME", "cifs_test_env"), "size": _pick(args.size, "VOLUME_SIZE", "100MB"), @@ -137,10 +123,10 @@ def _resolve_config(args: argparse.Namespace) -> dict[str, str | bool]: "share_comment": _pick(args.share_comment, "SHARE_COMMENT", "Provisioned by orchestrio"), "acl_user": _pick(args.acl_user, "ACL_USER", "Everyone"), "acl_permission": _pick(args.acl_permission, "ACL_PERMISSION", "full_control"), - "create_cifs_server": args.create_cifs_server, "cifs_server_name": _pick(args.cifs_server_name, "CIFS_SERVER_NAME", "ONTAP-CIFS"), "workgroup": _pick(args.workgroup, "CIFS_WORKGROUP", "WORKGROUP"), } + return cfg, bool(args.create_cifs_server) def _ensure_cifs_server( @@ -302,7 +288,8 @@ def _verify_and_log_acls(client: OntapClient, svm_uuid: str, share_name: str) -> def main() -> None: - cfg = _resolve_config(parse_args()) + """Resolve configuration, then orchestrate CIFS server, volume, share, and ACL setup.""" + cfg, create_cifs_server = _resolve_config(parse_args()) svm = cfg["svm"] volume = cfg["volume"] size = cfg["size"] @@ -314,7 +301,7 @@ def main() -> None: with OntapClient.from_env() as client: _ensure_cifs_server( - client, svm, cfg["create_cifs_server"], cfg["cifs_server_name"], cfg["workgroup"] + client, svm, create_cifs_server, cfg["cifs_server_name"], cfg["workgroup"] ) job_result = _ensure_volume_ntfs(client, svm, volume, size, aggregate) diff --git a/python/cluster_info.py b/python/cluster_info.py index a93a215..1d9545c 100644 --- a/python/cluster_info.py +++ b/python/cluster_info.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # © 2026 NetApp, Inc. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. @@ -34,6 +33,7 @@ def main() -> None: + """Retrieve cluster version and print all node names with serial numbers.""" with OntapClient.from_env() as client: # Step 1 — cluster version cluster = client.get("/cluster", fields="version") diff --git a/python/cluster_setup_basic.py b/python/cluster_setup_basic.py index 3c7c250..c4c757c 100644 --- a/python/cluster_setup_basic.py +++ b/python/cluster_setup_basic.py @@ -1,8 +1,7 @@ -#!/usr/bin/env python3 # © 2026 NetApp, Inc. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. -"""Create a storage cluster from two pre-cluster nodes. +"""Create a storage cluster from two pre-cluster nodes (ONTAP 9 unified). Steps: 1. discover_nodes — GET /api/cluster/nodes (membership=available, retry 3x/30s) 2. discover_local — isolate the local node (has management_interfaces != null) @@ -33,9 +32,8 @@ import os import sys import time -from pathlib import Path -from ontap_client import OntapClient +from ontap_client import OntapClient, load_env_file logging.basicConfig( level=logging.INFO, @@ -59,26 +57,18 @@ } # --------------------------------------------------------------------------- -_NODE_FIELDS_SETS = [ - # newest (9.19+) - ( - "name,model,state,ha,version,serial_number,membership," - "cluster_interfaces,management_interfaces,metrocluster,disaggregated,san_optimized" - ), - # 9.18 without disaggregated - ( - "name,model,state,ha,version,serial_number,membership," - "cluster_interfaces,management_interfaces,metrocluster,san_optimized" - ), - # 9.14 and older — minimal safe set - ( - "name,model,state,ha,version,serial_number,membership," - "cluster_interfaces,management_interfaces,metrocluster" - ), -] +# ONTAP 9 unified — node discovery fields +_NODE_FIELDS = ( + "name,model,state,ha,version,serial_number,membership," + "cluster_interfaces,management_interfaces,metrocluster" +) def _env(key: str, required: bool = True) -> str: + """Return the value for *key* from INPUTS or os.environ. + + Logs an error and exits if *required* is True and the value is empty. + """ val = INPUTS.get(key) or os.environ.get(key, "") if required and not val: logger.error( @@ -95,23 +85,11 @@ def _env(key: str, required: bool = True) -> str: def _get_nodes(client: OntapClient, **kwargs) -> dict: - """GET /cluster/nodes, trying progressively reduced field sets for older ONTAP versions.""" - last_exc: Exception | None = None - for fields in _NODE_FIELDS_SETS: - try: - return client.get("/cluster/nodes", fields=fields, **kwargs) - except Exception as exc: - if "262197" in str(exc): - logger.warning( - "discover: field unsupported on this version, retrying with reduced fields" - ) - last_exc = exc - continue - raise - raise last_exc # type: ignore[misc] + """GET /cluster/nodes with the standard ONTAP 9 unified field set.""" + return client.get("/cluster/nodes", fields=_NODE_FIELDS, **kwargs) -def discover_nodes(client: OntapClient, attempts: int = 3, delay: int = 30) -> dict: +def discover_nodes(client: OntapClient, attempts: int = 3, delay: int = 30) -> dict: # type: ignore[return] """Step 1 — discover available nodes, retry up to 3 times.""" for attempt in range(1, attempts + 1): try: @@ -212,27 +190,19 @@ def create_cluster(client: OntapClient, local: dict, partner: dict) -> dict: def track_job(client: OntapClient, job_uuid: str) -> dict: - """Step 5 — poll job until state != running (switch to cluster password first).""" - # After cluster creation the node switches to cluster mode — use CLUSTER_PASS - client._session.auth = (_env("ONTAP_USER"), _env("CLUSTER_PASS")) + """Step 5 — switch to cluster credentials then poll the job until completion. - while True: - result = client.get( - f"/cluster/jobs/{job_uuid}", - fields=("code,description,end_time,error,message,start_time,state,uuid"), - ) - state = result.get("state", "unknown") - logger.info("track_job — state=%s", state) - if state != "running": - if state != "success": - raise RuntimeError( - f"Cluster job ended with state='{state}': {result.get('error')}" - ) - return result - time.sleep(10) + After ``POST /cluster`` the node transitions to full cluster mode and + requires the new cluster-admin password. Authentication is updated via + :meth:`~ontap_client.OntapClient.update_auth` before polling begins. + """ + # After cluster creation the node switches to cluster mode — use CLUSTER_PASS + client.update_auth(_env("ONTAP_USER"), _env("CLUSTER_PASS")) + return client.poll_job(job_uuid, interval=10, timeout=1800) def main() -> None: + """Orchestrate all five cluster-setup steps and log the resulting cluster URL.""" host = _env("ONTAP_HOST") user = _env("ONTAP_USER") passwd = os.environ.get("ONTAP_PASS", "") # empty on pre-cluster nodes @@ -258,13 +228,11 @@ def main() -> None: def _load_env_file(path: str) -> None: - """Load KEY=VALUE pairs from a .env file into the INPUTS dict.""" - for line in Path(path).read_text(encoding="utf-8").splitlines(): - line = line.strip() - if not line or line.startswith("#") or "=" not in line: - continue - key, _, value = line.partition("=") - INPUTS[key.strip()] = value.strip().strip('"').strip("'") + """Load KEY=VALUE pairs from a .env file into both os.environ and the INPUTS dict.""" + load_env_file(path) + for key in list(INPUTS): + if val := os.environ.get(key): + INPUTS[key] = val if __name__ == "__main__": diff --git a/python/nfs_provision.py b/python/nfs_provision.py index b8b6d13..a6ab5cd 100644 --- a/python/nfs_provision.py +++ b/python/nfs_provision.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # © 2026 NetApp, Inc. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. @@ -43,9 +42,8 @@ import logging import os import sys -from pathlib import Path -from ontap_client import OntapClient +from ontap_client import OntapClient, load_env_file logging.basicConfig( level=logging.INFO, @@ -54,34 +52,17 @@ logger = logging.getLogger(__name__) ENV = { - "ONTAP_HOST": "", # cluster management IP — set here or via ONTAP_HOST env var + "ONTAP_HOST": "", # cluster management IP — set via ONTAP_HOST env var "ONTAP_USER": "admin", "ONTAP_PASS": "", # never hardcode — set via ONTAP_PASS env var - "SVM_NAME": "vs1", - "VOLUME_NAME": "vol_001", + "SVM_NAME": "", + "VOLUME_NAME": "", "VOLUME_SIZE": "100MB", - "AGGR_NAME": "sti232_vsim_sr091o_aggr1", # required — set via --aggregate or AGGR_NAME env var + "AGGR_NAME": "", # required — set via --aggregate or AGGR_NAME env var "CLIENT_MATCH": "0.0.0.0/0", } -def _load_env_file(path: str) -> None: - """Load KEY=VALUE pairs from an env file into os.environ (dotenv style).""" - p = Path(path) - if not p.is_file(): - logger.error("Env file not found: %s", path) - sys.exit(1) - for lineno, raw in enumerate(p.read_text().splitlines(), start=1): - line = raw.strip() - if not line or line.startswith("#"): - continue - if "=" not in line: - logger.error("Env file %s line %d: expected KEY=VALUE, got: %s", path, lineno, line) - sys.exit(1) - key, _, value = line.partition("=") - os.environ.setdefault(key.strip(), value.strip()) - - def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Provision an NFS volume") p.add_argument( @@ -104,7 +85,14 @@ def _pick(arg: str | None, env_key: str, default: str = "") -> str: def _resolve_config(args: argparse.Namespace) -> tuple[str, str, str, str, str]: - """Push ENV defaults into os.environ then resolve final values from all sources.""" + """Load env file (if provided) and resolve final values from all sources. + + Also pushes ENV block defaults into os.environ so ``OntapClient.from_env()`` + can pick up ``ONTAP_HOST`` / ``ONTAP_PASS`` from the ENV block when no real + env vars are set. + """ + if args.env_file: + load_env_file(args.env_file) for key, value in ENV.items(): if value and key not in os.environ: os.environ[key] = value @@ -220,9 +208,8 @@ def _assign_policy(client: OntapClient, volume_uuid: str, policy_name: str) -> N def main() -> None: + """Parse args, resolve config, then provision volume, export policy, and client rule.""" args = parse_args() - if args.env_file: - _load_env_file(args.env_file) svm, volume, size, aggregate, client_match = _resolve_config(args) if not aggregate: logger.error("--aggregate is required (or set AGGR_NAME in env / --env-file)") diff --git a/python/ontap_client.py b/python/ontap_client.py index 1f9e41e..15d0cb1 100644 --- a/python/ontap_client.py +++ b/python/ontap_client.py @@ -26,6 +26,8 @@ logger = logging.getLogger("ontap_client") +__all__ = ["OntapClient", "OntapApiError", "load_env_file"] + # All examples in this repo disable SSL verification to support environments # that use self-signed certificates. We recommend setting # ONTAP_VERIFY_SSL=true once CA-signed certificates are in place. The @@ -33,7 +35,7 @@ # is disabled. urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -_DEFAULT_TIMEOUT = 30 +_DEFAULT_TIMEOUT = 90 _DEFAULT_HEADERS = { "Accept": "application/hal+json", "Content-Type": "application/json", @@ -99,6 +101,15 @@ def __exit__(self, *exc: object) -> None: def close(self) -> None: self._session.close() + def update_auth(self, username: str, password: str) -> None: + """Replace the HTTP Basic-Auth credentials on the underlying session. + + Use this when the cluster switches authentication context mid-workflow + (e.g. after ``POST /cluster`` when the node moves from pre-cluster mode + to full cluster mode and requires the new cluster admin password). + """ + self._session.auth = (username, password) + # -- Factory ------------------------------------------------------------ @classmethod @@ -140,7 +151,15 @@ def _request(self, method: str, path: str, **kwargs: Any) -> dict[str, Any]: url = self._url(path) logger.debug("%s %s", method, url) - resp = self._session.request(method, url, **kwargs) + try: + resp = self._session.request(method, url, **kwargs) + except requests.exceptions.Timeout as exc: + raise RuntimeError( + f"{method} {url} timed out after {kwargs['timeout']} s — " + "the cluster may be busy or unreachable. " + "Increase the timeout via OntapClient(..., timeout=) if needed." + ) from exc + if not resp.ok: raise OntapApiError(resp) @@ -151,7 +170,6 @@ def _request(self, method: str, path: str, **kwargs: Any) -> dict[str, Any]: def get(self, path: str, *, fields: str = "", **params: str) -> dict[str, Any]: if fields: params["fields"] = fields - params.setdefault("return_timeout", "120") return self._request("GET", path, params=params) def post(self, path: str, body: dict[str, Any]) -> dict[str, Any]: @@ -206,3 +224,76 @@ def poll_job( raise TimeoutError(f"Job {job_uuid} did not complete within {timeout}s") time.sleep(interval) + + def wait_snapmirrored( + self, + rel_uuid: str, + *, + interval: int = 15, + max_wait: int = 1800, + ) -> dict[str, Any]: + """Poll a SnapMirror relationship until its state becomes ``snapmirrored``. + + Args: + rel_uuid: UUID of the SnapMirror relationship to watch. + interval: Seconds between polls (default 15). + max_wait: Maximum total seconds to wait before raising (default 1800). + + Returns: + The final relationship record when state == ``snapmirrored``. + + Raises: + :class:`RuntimeError` if ``max_wait`` is exceeded. + """ + elapsed = 0 + while elapsed < max_wait: + result = self.get( + f"/snapmirror/relationships/{rel_uuid}", + fields="state,lag_time,healthy", + ) + state = result.get("state", "unknown") + logger.info("Relationship %s — state: %s", rel_uuid, state) + if state == "snapmirrored": + return result + time.sleep(interval) + elapsed += interval + raise RuntimeError(f"Timed out waiting for relationship {rel_uuid} to reach snapmirrored") + + +# --------------------------------------------------------------------------- +# Shared utilities +# --------------------------------------------------------------------------- + + +def load_env_file(path: str) -> None: + """Load ``KEY=VALUE`` pairs from a file into :data:`os.environ` (dotenv style). + + Rules: + - Blank lines and lines starting with ``#`` are ignored. + - Values are set via :func:`os.environ.setdefault` so existing env vars + take precedence. + - Surrounding single or double quotes on values are stripped. + + Args: + path: Path to the env file. The script exits with an error message if + the file does not exist or contains a malformed line. + """ + from pathlib import Path # local import to avoid top-level dependency + + p = Path(path) + if not p.is_file(): + logger.error("Env file not found: %s", path) + import sys + + sys.exit(1) + for lineno, raw in enumerate(p.read_text(encoding="utf-8").splitlines(), start=1): + line = raw.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + logger.error("Env file %s line %d: expected KEY=VALUE, got: %s", path, lineno, line) + import sys + + sys.exit(1) + key, _, value = line.partition("=") + os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) diff --git a/python/snapmirror_cleanup_test_failover.py b/python/snapmirror_cleanup_test_failover.py index 55f5ff2..256d52a 100644 --- a/python/snapmirror_cleanup_test_failover.py +++ b/python/snapmirror_cleanup_test_failover.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # © 2026 NetApp, Inc. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. @@ -53,17 +52,21 @@ # USER INPUTS — fill in your values here before running # --------------------------------------------------------------------------- INPUTS = { - "CLUSTER_A": "", # first cluster management IP — never hardcode - "CLUSTER_B": "", # second cluster management IP — never hardcode + "CLUSTER_A": "", # first cluster management IP — set via CLUSTER_A env var + "CLUSTER_B": "", # second cluster management IP — set via CLUSTER_B env var "DEST_USER": "admin", - "DEST_PASS": "", # set via DEST_PASS env var + "DEST_PASS": "", # set via DEST_PASS env var — never hardcode "SOURCE_VOLUME": "", # source volume name (e.g. vol_rw_01) - "SOURCE_SVM": "vs0", # source SVM name + "SOURCE_SVM": "", # source SVM name } # --------------------------------------------------------------------------- def _env(key: str, default: str = "") -> str: + """Return the value for *key* from INPUTS or os.environ. + + Logs an error and exits if the resolved value is empty and no *default* is given. + """ val = INPUTS.get(key) or os.environ.get(key, default) if not val: logger.error( @@ -74,16 +77,6 @@ def _env(key: str, default: str = "") -> str: return val -def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: - while True: - result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") - state = result.get("state", "unknown") - logger.info(" job %s — state=%s", job_uuid, state) - if state != "running": - return result - time.sleep(interval) - - def _pick_cluster_by_relationship( cluster_a: str, cluster_b: str, @@ -96,13 +89,12 @@ def _pick_cluster_by_relationship( source_path = f"{source_svm}:{source_volume}" for host in (cluster_a, cluster_b): try: - client = OntapClient(host, user, passwd, verify_ssl=False, timeout=20) - resp = client.get( - "/snapmirror/relationships", - fields="uuid,source.path,destination.path,state,healthy", - **{"source.path": source_path, "max_records": "1"}, - ) - client.close() + with OntapClient(host, user, passwd, verify_ssl=False, timeout=20) as client: + resp = client.get( + "/snapmirror/relationships", + fields="uuid,source.path,destination.path,state,healthy", + **{"source.path": source_path, "max_records": "1"}, + ) if resp.get("num_records", 0) >= 1: return host, resp["records"][0] except Exception as exc: @@ -156,7 +148,7 @@ def _remove_smas_and_bring_online( ) job_uuid = resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) except Exception as exc: logger.warning("delete_smas_rel %s — %s (continuing)", smas_uuid, exc) if smas_resp.get("num_records", 0) == 0: @@ -168,7 +160,7 @@ def _remove_smas_and_bring_online( ) job_uuid = resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) except Exception as exc: logger.warning("bring_online — %s (continuing)", exc) @@ -184,7 +176,7 @@ def _unmount_clone(client: OntapClient, clone_uuid: str) -> None: ) job_uuid = resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) return except Exception as exc: logger.warning("unmount_clone attempt %d/6 — %s", attempt, exc) @@ -204,7 +196,7 @@ def _offline_clone(client: OntapClient, clone_uuid: str) -> None: ) job_uuid = resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) except Exception as exc: logger.warning("offline_clone — %s", exc) @@ -218,7 +210,7 @@ def _delete_and_confirm_clone( resp = client.delete(f"/storage/volumes/{clone_uuid}?return_timeout=120") job_uuid = resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) except Exception as exc: logger.warning("delete_clone — %s", exc) confirm = client.get( @@ -238,6 +230,7 @@ def _delete_and_confirm_clone( def main() -> None: + """Find the tagged FlexClone from a test failover and delete it through all cleanup phases.""" cluster_a = _env("CLUSTER_A") cluster_b = _env("CLUSTER_B") dest_user = _env("DEST_USER") diff --git a/python/snapmirror_provision_dest_managed.py b/python/snapmirror_provision_dest_managed.py index 4fa4c96..81d25c6 100644 --- a/python/snapmirror_provision_dest_managed.py +++ b/python/snapmirror_provision_dest_managed.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # © 2026 NetApp, Inc. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. @@ -52,7 +51,7 @@ import sys import time -from ontap_client import OntapClient +from ontap_client import OntapApiError, OntapClient logging.basicConfig( level=logging.INFO, @@ -67,18 +66,22 @@ "SOURCE_HOST": "", # source cluster management IP — set via SOURCE_HOST env var "SOURCE_USER": "admin", "SOURCE_PASS": "", # set via SOURCE_PASS env var — never hardcode - "SOURCE_SVM": "svm1", # source SVM name - "SOURCE_VOLUME": "vol_py1", # source RW volume name + "SOURCE_SVM": "", # source SVM name + "SOURCE_VOLUME": "", # source RW volume name "DEST_HOST": "", # destination cluster management IP — set via DEST_HOST env var "DEST_USER": "admin", "DEST_PASS": "", # set via DEST_PASS env var — never hardcode - "DEST_SVM": "vs0", # destination SVM name + "DEST_SVM": "", # destination SVM name "SM_POLICY": "Asynchronous", # SnapMirror policy (optional) } # --------------------------------------------------------------------------- def _env(key: str, default: str = "") -> str: + """Return the value for *key* from INPUTS or os.environ. + + Logs an error and exits if the resolved value is empty and no *default* is given. + """ val = INPUTS.get(key) or os.environ.get(key, default) if not val: logger.error( @@ -89,34 +92,6 @@ def _env(key: str, default: str = "") -> str: return val -def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: - while True: - result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") - state = result.get("state", "unknown") - logger.info(" job %s — state=%s", job_uuid, state) - if state != "running": - return result - time.sleep(interval) - - -def _wait_snapmirrored( - client: OntapClient, rel_uuid: str, interval: int = 15, max_wait: int = 1800 -) -> dict: - elapsed = 0 - while elapsed < max_wait: - result = client.get( - f"/snapmirror/relationships/{rel_uuid}", - fields="state,lag_time,healthy", - ) - state = result.get("state", "unknown") - logger.info(" relationship %s — state=%s", rel_uuid, state) - if state == "snapmirrored": - return result - time.sleep(interval) - elapsed += interval - raise RuntimeError(f"Timed out waiting for relationship {rel_uuid} to reach snapmirrored") - - def _get_ic_lif_ips(client: OntapClient) -> list[str]: """Return intercluster LIF IPs from the given cluster.""" resp = client.get( @@ -146,7 +121,7 @@ def _check_ic_lif_preconditions( logger.error( "PRE-CONDITION FAILED | Source cluster has no intercluster LIFs.\n" " SnapMirror requires at least one IC LIF on each cluster.\n" - " Create one via System Manager: Network → IP Interfaces → Add → Role: Intercluster\n" + " Create one via System Manager: Network -> IP Interfaces -> Add -> Role: Intercluster\n" " Or via CLI: network interface create -role intercluster -home-port e0d " "-address -netmask " ) @@ -155,7 +130,7 @@ def _check_ic_lif_preconditions( logger.error( "PRE-CONDITION FAILED | Dest cluster has no intercluster LIFs.\n" " SnapMirror requires at least one IC LIF on each cluster.\n" - " Create one via System Manager: Network → IP Interfaces → Add → Role: Intercluster\n" + " Create one via System Manager: Network -> IP Interfaces -> Add -> Role: Intercluster\n" " Or via CLI: network interface create -role intercluster -home-port e0d " "-address -netmask " ) @@ -342,7 +317,7 @@ def _create_svm_peer_relationship( ) peer_job = resp.get("job", {}).get("uuid", "") if peer_job: - _poll_job(dst, peer_job) + dst.poll_job(peer_job) logger.info("SVM PEER | created '%s' <-> '%s'", dest_svm, source_svm) except Exception as exc: exc_s = str(exc) @@ -412,11 +387,20 @@ def _phase_a_source_preflight( src_cluster.get("name"), src_cluster.get("version", {}).get("full"), ) - src_vol_resp = src.get( - "/storage/volumes", - fields="name,uuid,state,type,space.size", - **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, - ) + try: + src_vol_resp = src.get( + "/storage/volumes", + fields="name,uuid,state,type,space.size", + **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, + ) + except OntapApiError as exc: + logger.error( + "ABORTED — could not query source volume '%s' on %s: %s", + source_volume, + source_host, + exc, + ) + sys.exit(1) if src_vol_resp.get("num_records", 0) == 0: logger.error( "ABORTED — source volume '%s' not found on %s", @@ -472,7 +456,7 @@ def _phase_d_setup_relationship( ) job_uuid = create_resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(dst, job_uuid) + dst.poll_job(job_uuid) except Exception as exc: logger.info("create_and_initialize_relationship — %s (may already exist)", exc) @@ -528,6 +512,7 @@ def _phase_d_setup_relationship( def main() -> None: + """Orchestrate all six phases (A–F) to provision a SnapMirror relationship from the destination cluster.""" source_host = _env("SOURCE_HOST") source_user = _env("SOURCE_USER") source_pass = _env("SOURCE_PASS") @@ -616,7 +601,7 @@ def main() -> None: ) logger.info("=== Phase E: Convergence polling ===") - _wait_snapmirrored(dst, rel_uuid) + dst.wait_snapmirrored(rel_uuid) logger.info("=== Phase F: Final validation ===") final = dst.get( diff --git a/python/snapmirror_provision_src_managed.py b/python/snapmirror_provision_src_managed.py index 481d8fe..f1ff4bd 100644 --- a/python/snapmirror_provision_src_managed.py +++ b/python/snapmirror_provision_src_managed.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # © 2026 NetApp, Inc. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. @@ -42,9 +41,8 @@ import logging import os import sys -import time -from ontap_client import OntapClient +from ontap_client import OntapApiError, OntapClient logging.basicConfig( level=logging.INFO, @@ -56,21 +54,25 @@ # USER INPUTS — fill in your values here before running # --------------------------------------------------------------------------- INPUTS = { - "SOURCE_HOST": "", # source cluster management IP — never hardcode + "SOURCE_HOST": "", # source cluster management IP — set via SOURCE_HOST env var "SOURCE_USER": "admin", - "SOURCE_PASS": "", # set via SOURCE_PASS env var - "SOURCE_SVM": "vs0", # source SVM name + "SOURCE_PASS": "", # set via SOURCE_PASS env var — never hardcode + "SOURCE_SVM": "", # source SVM name "SOURCE_VOLUME": "", # source RW volume name - "DEST_HOST": "", # destination cluster management IP — never hardcode + "DEST_HOST": "", # destination cluster management IP — set via DEST_HOST env var "DEST_USER": "admin", - "DEST_PASS": "", # set via DEST_PASS env var - "DEST_SVM": "vs1", # destination SVM name + "DEST_PASS": "", # set via DEST_PASS env var — never hardcode + "DEST_SVM": "", # destination SVM name "SM_POLICY": "Asynchronous", # SnapMirror policy (optional) } # --------------------------------------------------------------------------- def _env(key: str, default: str = "") -> str: + """Return the value for *key* from INPUTS or os.environ. + + Logs an error and exits if the resolved value is empty and no *default* is given. + """ val = INPUTS.get(key) or os.environ.get(key, default) if not val: logger.error( @@ -81,35 +83,249 @@ def _env(key: str, default: str = "") -> str: return val -def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: - while True: - result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") - state = result.get("state", "unknown") - logger.info(" job %s — state=%s", job_uuid, state) - if state != "running": - return result - time.sleep(interval) - - -def _wait_snapmirrored( - client: OntapClient, rel_uuid: str, interval: int = 15, max_wait: int = 1800 +def _phase_a_source_preflight( + src: OntapClient, source_svm: str, source_volume: str, source_host: str ) -> dict: - elapsed = 0 - while elapsed < max_wait: - result = client.get( - f"/snapmirror/relationships/{rel_uuid}", - fields="state,lag_time,healthy", + """Verify source cluster connectivity and validate the source volume. + + Returns the source volume record. Aborts if missing or DP type. + """ + src_cluster = src.get("/cluster", fields="name,version") + logger.info( + "SOURCE CLUSTER | name=%s | ontap=%s", + src_cluster.get("name"), + src_cluster.get("version", {}).get("full"), + ) + src_vol_resp = src.get( + "/storage/volumes", + fields="name,uuid,state,type,space.size", + **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, + ) + if src_vol_resp.get("num_records", 0) == 0: + logger.error( + "ABORTED — source volume '%s' not found on %s", + source_volume, + source_host, + ) + sys.exit(1) + src_vol = src_vol_resp["records"][0] + if src_vol.get("type") == "dp": + logger.error("ABORTED — source volume is type=dp; specify the RW volume") + sys.exit(1) + logger.info( + "SOURCE VOLUME | name=%s | uuid=%s | state=%s | type=%s | size=%s", + src_vol["name"], + src_vol["uuid"], + src_vol["state"], + src_vol["type"], + src_vol.get("space", {}).get("size"), + ) + return src_vol + + +def _phase_b_dest_preflight(dst: OntapClient) -> tuple[str, str]: + """Query the dest cluster for its peer name and best available aggregate. + + Returns ``(peer_name, aggr_name)``. + """ + dst_cluster = dst.get("/cluster", fields="name,version") + logger.info( + "DEST CLUSTER | name=%s | ontap=%s", + dst_cluster.get("name"), + dst_cluster.get("version", {}).get("full"), + ) + peer_resp = dst.get( + "/cluster/peers", + fields="name,status.state", + **{"max_records": "1"}, + ) + peer_name = peer_resp.get("records", [{}])[0].get("name", "") + logger.info("CLUSTER PEER | name=%s", peer_name) + + aggr_resp = dst.get( + "/storage/aggregates", + fields="name,space.block_storage.available", + state="online", + **{"max_records": "1", "order_by": "space.block_storage.available desc"}, + ) + aggr_name = aggr_resp.get("records", [{}])[0].get("name", "") + logger.info("DEST AGGREGATE | name=%s", aggr_name) + return peer_name, aggr_name + + +def _phase_c_dest_volume_setup( + dst: OntapClient, + dest_svm: str, + dest_volume: str, + aggr_name: str, + src_vol: dict, +) -> None: + """Create the destination DP volume if it does not already exist.""" + check_dest = dst.get( + "/storage/volumes", + fields="name,uuid,state,type", + **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, + ) + if check_dest.get("num_records", 0) == 0: + logger.info("Creating dest DP volume '%s' on '%s'...", dest_volume, aggr_name) + try: + dst.post( + "/storage/volumes?return_timeout=120", + body={ + "name": dest_volume, + "type": "dp", + "svm": {"name": dest_svm}, + "aggregates": [{"name": aggr_name}], + "size": str(src_vol.get("space", {}).get("size", "")), + }, + ) + except Exception as exc: + logger.warning("create_dest_volume — %s (may already exist)", exc) + else: + logger.info("Dest volume '%s' already exists — skipping create", dest_volume) + + dst_vol_resp = dst.get( + "/storage/volumes", + fields="name,uuid,state,type", + **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, + ) + dst_vol = dst_vol_resp.get("records", [{}])[0] + logger.info( + "DEST VOLUME | name=%s | uuid=%s | state=%s | type=%s", + dst_vol.get("name"), + dst_vol.get("uuid"), + dst_vol.get("state"), + dst_vol.get("type"), + ) + + +def _phase_d_setup_relationship( + dst: OntapClient, + source_svm: str, + source_volume: str, + dest_svm: str, + dest_volume: str, + peer_name: str, + sm_policy: str, +) -> None: + """Create and initialize the SnapMirror relationship (idempotent).""" + existing = dst.get( + "/snapmirror/relationships", + fields="uuid,state,healthy", + **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + ) + logger.info("RELATIONSHIP CHECK | existing=%d", existing.get("num_records", 0)) + try: + create_resp = dst.post( + "/snapmirror/relationships?return_timeout=120", + body={ + "source": { + "path": f"{source_svm}:{source_volume}", + "cluster": {"name": peer_name}, + }, + "destination": {"path": f"{dest_svm}:{dest_volume}"}, + "policy": {"name": sm_policy}, + }, ) - state = result.get("state", "unknown") - logger.info(" relationship %s — state=%s", rel_uuid, state) - if state == "snapmirrored": - return result - time.sleep(interval) - elapsed += interval - raise RuntimeError(f"Timed out waiting for relationship {rel_uuid} to reach snapmirrored") + job_uuid = create_resp.get("job", {}).get("uuid") + if job_uuid: + dst.poll_job(job_uuid) + except Exception as exc: + logger.warning("create_and_initialize_relationship — %s (may already exist)", exc) + + +def _abort_ic_lif_unreachable( + src: OntapClient, + dst: OntapClient, + source_host: str, + dest_host: str, + exc: OntapApiError, +) -> None: + """Log a detailed error about unreachable IC LIFs and abort.""" + src_ips = [ + r["ip"]["address"] + for r in src.get( + "/network/ip/interfaces", + fields="ip.address,services", + **{"max_records": "50"}, + ).get("records", []) + if any("intercluster" in str(s) for s in r.get("services", [])) + ] + dst_ips = [ + r["ip"]["address"] + for r in dst.get( + "/network/ip/interfaces", + fields="ip.address,services", + **{"max_records": "50"}, + ).get("records", []) + if any("intercluster" in str(s) for s in r.get("services", [])) + ] + logger.error( + "ABORTED — SnapMirror initialize failed: source volume not reachable.\n" + " ONTAP error : %s\n" + " Likely cause: TCP 11104/11105 is blocked between IC LIFs.\n" + " src IC LIFs : %s\n" + " dst IC LIFs : %s\n" + " Note : SnapMirror transfers are always initiated by the DEST\n" + " cluster. With SOURCE=%s and DEST=%s, the dest cluster\n" + " (%s) must reach the source (%s) on TCP 11104/11105.\n" + " Fix : Open TCP 11104/11105 in that direction, or swap\n" + " SOURCE/DEST so the working direction is used.", + exc, + src_ips, + dst_ips, + source_host, + dest_host, + dest_host, + source_host, + ) + sys.exit(1) + + +def _phase_e_convergence_polling( + src: OntapClient, + dst: OntapClient, + source_host: str, + dest_host: str, + dest_svm: str, + dest_volume: str, +) -> str: + """Kick off the initial transfer and wait until the relationship is snapmirrored. + + Returns the relationship UUID. + """ + rel_resp = dst.get( + "/snapmirror/relationships", + fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", + **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + ) + rel = rel_resp.get("records", [{}])[0] + rel_uuid = rel.get("uuid", "") + logger.info( + "RELATIONSHIP FOUND | uuid=%s | state=%s | healthy=%s", + rel_uuid, + rel.get("state"), + rel.get("healthy"), + ) + try: + dst.post( + f"/snapmirror/relationships/{rel_uuid}/transfers?return_timeout=120", + body={}, + ) + except OntapApiError as exc: + err = exc.detail.get("error", {}) if isinstance(exc.detail, dict) else {} + if err.get("code") == "13303812" and "not found" in err.get("message", ""): + _abort_ic_lif_unreachable(src, dst, source_host, dest_host, exc) + logger.warning("initialize_relationship — %s (may already be initialized)", exc) + except Exception as exc: + logger.warning("initialize_relationship — %s (may already be initialized)", exc) + + dst.wait_snapmirrored(rel_uuid) + return rel_uuid def main() -> None: + """Orchestrate all six phases (A–F) to provision a SnapMirror relationship from the source cluster.""" source_host = _env("SOURCE_HOST") source_user = _env("SOURCE_USER") source_pass = _env("SOURCE_PASS") @@ -129,152 +345,24 @@ def main() -> None: with src, dst: logger.info("=== Phase A: Source pre-flight ===") - src_cluster = src.get("/cluster", fields="name,version") - logger.info( - "SOURCE CLUSTER | name=%s | ontap=%s", - src_cluster.get("name"), - src_cluster.get("version", {}).get("full"), - ) - - src_vol_resp = src.get( - "/storage/volumes", - fields="name,uuid,state,type,space.size", - **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, - ) - if src_vol_resp.get("num_records", 0) == 0: - logger.error( - "ABORTED — source volume '%s' not found on %s", - source_volume, - source_host, - ) - sys.exit(1) - src_vol = src_vol_resp["records"][0] - if src_vol.get("type") == "dp": - logger.error("ABORTED — source volume is type=dp; specify the RW volume") - sys.exit(1) - logger.info( - "SOURCE VOLUME | name=%s | uuid=%s | state=%s | type=%s | size=%s", - src_vol["name"], - src_vol["uuid"], - src_vol["state"], - src_vol["type"], - src_vol.get("space", {}).get("size"), - ) + src_vol = _phase_a_source_preflight(src, source_svm, source_volume, source_host) logger.info("=== Phase B: Dest pre-flight ===") - dst_cluster = dst.get("/cluster", fields="name,version") - logger.info( - "DEST CLUSTER | name=%s | ontap=%s", - dst_cluster.get("name"), - dst_cluster.get("version", {}).get("full"), - ) - - peer_resp = dst.get( - "/cluster/peers", - fields="name,status.state", - **{"max_records": "1"}, - ) - peer_name = peer_resp.get("records", [{}])[0].get("name", "") - logger.info("CLUSTER PEER | name=%s", peer_name) - - aggr_resp = dst.get( - "/storage/aggregates", - fields="name,space.block_storage.available", - state="online", - **{"max_records": "1", "order_by": "space.block_storage.available desc"}, - ) - aggr_name = aggr_resp.get("records", [{}])[0].get("name", "") - logger.info("DEST AGGREGATE | name=%s", aggr_name) + peer_name, aggr_name = _phase_b_dest_preflight(dst) logger.info("=== Phase C: Dest volume setup ===") - check_dest = dst.get( - "/storage/volumes", - fields="name,uuid,state,type", - **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, - ) - if check_dest.get("num_records", 0) == 0: - logger.info("Creating dest DP volume '%s' on '%s'…", dest_volume, aggr_name) - try: - dst.post( - "/storage/volumes?return_timeout=120", - body={ - "name": dest_volume, - "type": "dp", - "svm": {"name": dest_svm}, - "aggregates": [{"name": aggr_name}], - "size": str(src_vol.get("space", {}).get("size", "")), - }, - ) - except Exception as exc: - logger.warning("create_dest_volume — %s (may already exist)", exc) - else: - logger.info("Dest volume '%s' already exists — skipping create", dest_volume) - - dst_vol_resp = dst.get( - "/storage/volumes", - fields="name,uuid,state,type", - **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, - ) - dst_vol = dst_vol_resp.get("records", [{}])[0] - logger.info( - "DEST VOLUME | name=%s | uuid=%s | state=%s | type=%s", - dst_vol.get("name"), - dst_vol.get("uuid"), - dst_vol.get("state"), - dst_vol.get("type"), - ) + _phase_c_dest_volume_setup(dst, dest_svm, dest_volume, aggr_name, src_vol) logger.info("=== Phase D: Relationship setup ===") - existing = dst.get( - "/snapmirror/relationships", - fields="uuid,state,healthy", - **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + _phase_d_setup_relationship( + dst, source_svm, source_volume, dest_svm, dest_volume, peer_name, sm_policy ) - logger.info("RELATIONSHIP CHECK | existing=%d", existing.get("num_records", 0)) - - try: - create_resp = dst.post( - "/snapmirror/relationships?return_timeout=120", - body={ - "source": { - "path": f"{source_svm}:{source_volume}", - "cluster": {"name": peer_name}, - }, - "destination": {"path": f"{dest_svm}:{dest_volume}"}, - "policy": {"name": sm_policy}, - }, - ) - job_uuid = create_resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(dst, job_uuid) - except Exception as exc: - logger.warning("create_and_initialize_relationship — %s (may already exist)", exc) logger.info("=== Phase E: Convergence polling ===") - rel_resp = dst.get( - "/snapmirror/relationships", - fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", - **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, - ) - rel = rel_resp.get("records", [{}])[0] - rel_uuid = rel.get("uuid", "") - logger.info( - "RELATIONSHIP FOUND | uuid=%s | state=%s | healthy=%s", - rel_uuid, - rel.get("state"), - rel.get("healthy"), + rel_uuid = _phase_e_convergence_polling( + src, dst, source_host, dest_host, dest_svm, dest_volume ) - try: - dst.post( - f"/snapmirror/relationships/{rel_uuid}/transfers?return_timeout=120", - body={}, - ) - except Exception as exc: - logger.warning("initialize_relationship — %s (may already be initialized)", exc) - - _wait_snapmirrored(dst, rel_uuid) - logger.info("=== Phase F: Final validation ===") final = dst.get( f"/snapmirror/relationships/{rel_uuid}", diff --git a/python/snapmirror_test_failover.py b/python/snapmirror_test_failover.py index 48b2aba..f8b0e5f 100644 --- a/python/snapmirror_test_failover.py +++ b/python/snapmirror_test_failover.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # © 2026 NetApp, Inc. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. @@ -41,7 +40,6 @@ import logging import os import sys -import time from ontap_client import OntapClient @@ -55,16 +53,20 @@ # USER INPUTS — fill in your values here before running # --------------------------------------------------------------------------- INPUTS = { - "CLUSTER_A": "", # first cluster management IP — never hardcode - "CLUSTER_B": "", # second cluster management IP — never hardcode + "CLUSTER_A": "", # first cluster management IP — set via CLUSTER_A env var + "CLUSTER_B": "", # second cluster management IP — set via CLUSTER_B env var "DEST_USER": "admin", - "DEST_PASS": "", # set via DEST_PASS env var + "DEST_PASS": "", # set via DEST_PASS env var — never hardcode "SOURCE_VOLUME": "", # source volume name, or * to auto-detect } # --------------------------------------------------------------------------- def _env(key: str, default: str = "") -> str: + """Return the value for *key* from INPUTS or os.environ. + + Logs an error and exits if the resolved value is empty and no *default* is given. + """ val = INPUTS.get(key) or os.environ.get(key, default) if not val: logger.error( @@ -75,34 +77,6 @@ def _env(key: str, default: str = "") -> str: return val -def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: - while True: - result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") - state = result.get("state", "unknown") - logger.info(" job %s — state=%s", job_uuid, state) - if state != "running": - return result - time.sleep(interval) - - -def _wait_snapmirrored( - client: OntapClient, rel_uuid: str, interval: int = 15, max_wait: int = 1800 -) -> dict: - elapsed = 0 - while elapsed < max_wait: - result = client.get( - f"/snapmirror/relationships/{rel_uuid}", - fields="state,lag_time,healthy", - ) - state = result.get("state", "unknown") - logger.info(" relationship %s — state=%s", rel_uuid, state) - if state == "snapmirrored": - return result - time.sleep(interval) - elapsed += interval - raise RuntimeError(f"Timed out waiting for relationship {rel_uuid} to reach snapmirrored") - - def _pick_cluster( cluster_a: str, cluster_b: str, user: str, passwd: str, vol_name_filter: str ) -> tuple[str, dict]: @@ -114,18 +88,17 @@ def _pick_cluster( for host in (cluster_a, cluster_b): try: - client = OntapClient(host, user, passwd, verify_ssl=False, timeout=20) - resp = client.get( - "/storage/volumes", - fields="name,create_time,uuid,svm.name,state,space.size", - **{ - "type": "dp", - "name": dest_filter, - "order_by": "create_time desc", - "max_records": "1", - }, - ) - client.close() + with OntapClient(host, user, passwd, verify_ssl=False, timeout=20) as client: + resp = client.get( + "/storage/volumes", + fields="name,create_time,uuid,svm.name,state,space.size", + **{ + "type": "dp", + "name": dest_filter, + "order_by": "create_time desc", + "max_records": "1", + }, + ) if resp.get("num_records", 0) >= 1: best_cluster = host best_vol = resp["records"][0] @@ -141,6 +114,7 @@ def _pick_cluster( def main() -> None: + """Auto-detect target cluster, create a FlexClone for test failover, then resync SnapMirror.""" cluster_a = _env("CLUSTER_A") cluster_b = _env("CLUSTER_B") dest_user = _env("DEST_USER") @@ -225,7 +199,7 @@ def main() -> None: ) job_uuid = clone_resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) except Exception as exc: logger.warning("create_test_clone — %s (may already exist)", exc) @@ -278,11 +252,11 @@ def main() -> None: ) job_uuid = resync_resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid, interval=10) + client.poll_job(job_uuid) except Exception as exc: logger.warning("resync_sm_relationship — %s", exc) - _wait_snapmirrored(client, rel_uuid) + client.wait_snapmirrored(rel_uuid) logger.info("=== TEST FAILOVER COMPLETE — SnapMirror resynced ===") diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..8ad5959 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,2 @@ +pytest>=8.0 +pytest-mock>=3.12