From 68742fbedbbd294dfa168d3ce8f2da500085f2f5 Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Tue, 12 May 2026 15:57:19 +0200 Subject: [PATCH] feat: Add werkzeug instrumentation and unittests Signed-off-by: Cagri Yonca --- src/instana/__init__.py | 1 + src/instana/instrumentation/werkzeug.py | 138 ++++ src/instana/instrumentation/wsgi.py | 111 +--- src/instana/util/wsgi_utils.py | 232 +++++++ tests/frameworks/test_werkzeug.py | 624 ++++++++++++++++++ tests/instrumentation/__init__.py | 0 tests/instrumentation/test_werkzeug.py | 154 +++++ tests/instrumentation/test_wsgi_middleware.py | 270 ++++++++ tests/util/__init__.py | 0 tests/util/test_wsgi_utils.py | 309 +++++++++ 10 files changed, 1746 insertions(+), 93 deletions(-) create mode 100644 src/instana/instrumentation/werkzeug.py create mode 100644 src/instana/util/wsgi_utils.py create mode 100644 tests/frameworks/test_werkzeug.py create mode 100644 tests/instrumentation/__init__.py create mode 100644 tests/instrumentation/test_werkzeug.py create mode 100644 tests/instrumentation/test_wsgi_middleware.py create mode 100644 tests/util/__init__.py create mode 100644 tests/util/test_wsgi_utils.py diff --git a/src/instana/__init__.py b/src/instana/__init__.py index dd55ee49..cdd37a6e 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -184,6 +184,7 @@ def boot_agent() -> None: sqlalchemy, # noqa: F401 starlette, # noqa: F401 urllib3, # noqa: F401 + werkzeug, # noqa: F401 gevent, # noqa: F401 ) from instana.instrumentation.aiohttp import ( diff --git a/src/instana/instrumentation/werkzeug.py b/src/instana/instrumentation/werkzeug.py new file mode 100644 index 00000000..aa6ce620 --- /dev/null +++ b/src/instana/instrumentation/werkzeug.py @@ -0,0 +1,138 @@ +# (C) Copyright IBM Corp. 2026 + +""" +Instana Werkzeug Instrumentation + +This module provides automatic instrumentation for Werkzeug-based applications. +Werkzeug is a comprehensive WSGI web application library used by Flask and other frameworks. + +This module automatically patches Werkzeug applications when imported via autowrapt. +""" + +try: + from typing import Any, Callable + + import wrapt + from opentelemetry import context + + from instana.log import logger + from instana.util.wsgi_utils import ( + build_start_response, + create_span_with_context, + end_span_after_iterating, + ) + + # Autowrapt patching for automatic instrumentation + class _TracedWSGIApp: + """Wrapper that traces WSGI applications.""" + + def __init__(self, app: Callable) -> None: + self.app = app + + def __call__(self, environ: dict[str, Any], start_response: Callable) -> Any: + try: + span, token = create_span_with_context(environ) + wrapped_start_response = build_start_response(span, start_response) + except Exception: + logger.debug("werkzeug setup failed", exc_info=True) + return self.app(environ, start_response) + + try: + iterable = self.app(environ, wrapped_start_response) + return end_span_after_iterating(iterable, span, token) + except Exception as exc: + try: + if span and span.is_recording(): + span.record_exception(exc) + span.end() + if token: + context.detach(token) + except Exception: + logger.debug("werkzeug cleanup failed", exc_info=True) + raise + + def _is_flask_app(app: Any) -> bool: + """ + Check if the application is a Flask app. + + Flask apps have their own instrumentation, so we skip wrapping them + to avoid double instrumentation (2 spans per request). + + Args: + app: The WSGI application to check + + Returns: + True if app is a Flask application, False otherwise + """ + try: + # Check if it's a Flask app by class name + if hasattr(app, "__class__"): + class_name = app.__class__.__name__ + module_name = getattr(app.__class__, "__module__", "") + + # Direct Flask app check + if class_name == "Flask" and "flask" in module_name: + return True + + # Check for Flask app wrapped in middleware + if hasattr(app, "wsgi_app"): + return _is_flask_app(app.wsgi_app) + + return False + except Exception: + logger.debug("Error checking if app is Flask", exc_info=True) + return False + + @wrapt.patch_function_wrapper("werkzeug.serving", "run_simple") + def run_simple_with_instana( + wrapped: Callable, + instance: Any, + args: tuple, + kwargs: dict[str, Any], + ) -> Any: + """ + Patch werkzeug.serving.run_simple to wrap WSGI applications. + + Skips Flask applications as they have their own instrumentation. + """ + try: + # run_simple(hostname, port, application, ...) + if len(args) >= 3: + hostname, port, application = args[0], args[1], args[2] + + # Skip Flask apps (they have their own instrumentation) + if _is_flask_app(application): + logger.debug( + f"Skipping Werkzeug instrumentation for Flask app at {hostname}:{port}" + ) + return wrapped(*args, **kwargs) + + # Wrap non-Flask WSGI apps + instrumented_app = _TracedWSGIApp(application) + logger.debug(f"Werkzeug app wrapped: {hostname}:{port}") + args = (hostname, port, instrumented_app) + args[3:] + elif "application" in kwargs: + application = kwargs["application"] + + # Skip Flask apps (they have their own instrumentation) + if _is_flask_app(application): + logger.debug( + "Skipping Werkzeug instrumentation for Flask app (kwargs)" + ) + return wrapped(*args, **kwargs) + + # Wrap non-Flask WSGI apps + instrumented_app = _TracedWSGIApp(application) + kwargs["application"] = instrumented_app + logger.debug("Werkzeug app wrapped (kwargs)") + except Exception: + logger.debug("Failed to wrap Werkzeug app", exc_info=True) + + return wrapped(*args, **kwargs) + + logger.debug("Instrumenting werkzeug") + +except ImportError: + pass + +# Made with Bob diff --git a/src/instana/instrumentation/wsgi.py b/src/instana/instrumentation/wsgi.py index 2af36143..fe75c262 100644 --- a/src/instana/instrumentation/wsgi.py +++ b/src/instana/instrumentation/wsgi.py @@ -5,114 +5,39 @@ Instana WSGI Middleware """ -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple +from typing import Any, Callable -from opentelemetry import context, trace -from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry import context -from instana.propagators.format import Format -from instana.singletons import agent, get_tracer -from instana.util.secrets import strip_secrets_from_query -from instana.util.traceutils import extract_custom_headers - -if TYPE_CHECKING: - from instana.span.span import InstanaSpan +from instana.util.wsgi_utils import ( + build_start_response, + create_span_with_context, + end_span_after_iterating, +) class InstanaWSGIMiddleware(object): """Instana WSGI middleware""" - def __init__(self, app: object) -> None: + def __init__(self, app: Callable) -> None: self.app = app - def __call__(self, environ: Dict[str, Any], start_response: Callable) -> object: - env = environ - tracer = get_tracer() - - # Extract context and start span - parent_context = tracer.extract(Format.HTTP_HEADERS, env) - span = tracer.start_span("wsgi", context=parent_context) - - # Attach context - this makes the span current - ctx = trace.set_span_in_context(span) - token = context.attach(ctx) - - # Extract custom headers from request - extract_custom_headers(span, env, format=True) - - # Set request attributes - _set_request_attributes(span, env) - - def new_start_response( - status: str, - headers: List[Tuple[object, ...]], - exc_info: Optional[Exception] = None, - ) -> object: - """Modified start response with additional headers.""" - extract_custom_headers(span, headers) - - tracer.inject(span.context, Format.HTTP_HEADERS, headers) - - headers_str = [ - (header[0], str(header[1])) - if not isinstance(header[1], str) - else header - for header in headers - ] - - # Set status code attribute - sc = status.split(" ")[0] - if int(sc) >= 500: - span.mark_as_errored() - - span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, sc) - - return start_response(status, headers_str, exc_info) - + def __call__(self, environ: dict[str, Any], start_response: Callable) -> object: try: - iterable = self.app(environ, new_start_response) - - # Wrap the iterable to ensure span ends after iteration completes - return _end_span_after_iterating(iterable, span, token) + span, token = create_span_with_context(environ) + wrapped_start_response = build_start_response( + span, start_response, status_as_string=True + ) + except Exception: + return self.app(environ, start_response) + try: + iterable = self.app(environ, wrapped_start_response) + return end_span_after_iterating(iterable, span, token) except Exception as exc: - # If exception occurs before iteration completes, end span and detach token if span and span.is_recording(): span.record_exception(exc) span.end() if token: context.detach(token) raise exc - - -def _end_span_after_iterating( - iterable: Iterable[object], span: "InstanaSpan", token: object -) -> Iterable[object]: - try: - yield from iterable - finally: - # Ensure iterable cleanup (important for generators) - if hasattr(iterable, "close"): - iterable.close() - - # End span and detach token after iteration completes - if span and span.is_recording(): - span.end() - if token: - context.detach(token) - - -def _set_request_attributes(span: "InstanaSpan", env: Dict[str, Any]) -> None: - if "PATH_INFO" in env: - span.set_attribute("http.path", env["PATH_INFO"]) - if "QUERY_STRING" in env and len(env["QUERY_STRING"]): - scrubbed_params = strip_secrets_from_query( - env["QUERY_STRING"], - agent.options.secrets_matcher, - agent.options.secrets_list, - ) - span.set_attribute("http.params", scrubbed_params) - if "REQUEST_METHOD" in env: - span.set_attribute(SpanAttributes.HTTP_METHOD, env["REQUEST_METHOD"]) - if "HTTP_HOST" in env: - span.set_attribute(SpanAttributes.HTTP_HOST, env["HTTP_HOST"]) diff --git a/src/instana/util/wsgi_utils.py b/src/instana/util/wsgi_utils.py new file mode 100644 index 00000000..f6746379 --- /dev/null +++ b/src/instana/util/wsgi_utils.py @@ -0,0 +1,232 @@ +# (C) Copyright IBM Corp. 2026 + +""" +Shared WSGI Instrumentation Utilities + +This module provides common utilities for WSGI instrumentation used by +both werkzeug.py and wsgi.py modules to avoid code duplication. +""" + +from typing import TYPE_CHECKING, Any, Callable, Iterable, Optional + +from opentelemetry import context, trace +from opentelemetry.semconv.trace import SpanAttributes + +from instana.log import logger +from instana.propagators.format import Format +from instana.singletons import agent, get_tracer +from instana.util.secrets import strip_secrets_from_query +from instana.util.traceutils import extract_custom_headers + +if TYPE_CHECKING: + from instana.span.span import InstanaSpan + + +def create_span_with_context(environ: dict[str, Any]) -> tuple["InstanaSpan", Any]: + """ + Create and configure a span with context for the request. + + Args: + environ: WSGI environment dictionary + + Returns: + Tuple of (span, context_token) + """ + tracer = get_tracer() + parent_context = tracer.extract(Format.HTTP_HEADERS, environ) + span = tracer.start_span("wsgi", context=parent_context) + + ctx = trace.set_span_in_context(span) + token = context.attach(ctx) + + extract_custom_headers(span, environ, format=True) + set_request_attributes(span, environ) + + return span, token + + +def build_start_response( + span: "InstanaSpan", + start_response: Callable, + status_as_string: bool = False, +) -> Callable: + """ + Create an instrumented start_response callable. + + Args: + span: The active span + start_response: Original WSGI start_response callable + status_as_string: If True, set status code as string (for wsgi.py compatibility) + + Returns: + Wrapped start_response callable + """ + + def new_start_response( + status: str, + headers: list[tuple[str, str]], + exc_info: Optional[tuple[Any, Any, Any]] = None, + ) -> Callable: + """Modified start_response with trace context injection.""" + try: + extract_custom_headers(span, headers) + tracer = get_tracer() + tracer.inject( + span.context, + Format.HTTP_HEADERS, + headers, + ) + + status_code = parse_status_code(status) + if status_code is not None: + if status_code >= 500: + span.mark_as_errored() + span.set_attribute( + SpanAttributes.HTTP_STATUS_CODE, + str(status_code) if status_as_string else status_code, + ) + + return start_response( + status, + normalize_headers(headers), + exc_info, + ) + except Exception: + logger.debug("Error in WSGI start_response wrapper", exc_info=True) + return start_response(status, headers, exc_info) + + return new_start_response + + +def normalize_headers( + headers: list[tuple[str, Any]], +) -> list[tuple[str, str]]: + """ + Ensure all header values are strings for WSGI compliance. + + Args: + headers: List of (name, value) tuples + + Returns: + List of (name, str_value) tuples + """ + return [ + (name, value if isinstance(value, str) else str(value)) + for name, value in headers + ] + + +def parse_status_code(status: str) -> Optional[int]: + """ + Safely parse the HTTP status code from a WSGI status string. + + Args: + status: WSGI status string (e.g., "200 OK") + + Returns: + Status code as integer, or None if parsing fails + """ + try: + return int(status.split()[0]) + except (AttributeError, IndexError, TypeError, ValueError): + return None + + +def end_span_after_iterating( + iterable: Iterable[bytes], + span: "InstanaSpan", + token: Any, +) -> Iterable[bytes]: + """ + Generator that yields from the iterable and ensures span cleanup. + + Args: + iterable: The response iterable from the application + span: The active span + token: The context token + + Yields: + Response chunks from the iterable + """ + try: + yield from iterable + finally: + # Ensure iterable cleanup (important for generators) + if hasattr(iterable, "close"): + try: + iterable.close() # type: ignore + except Exception: + logger.debug("Error closing iterable", exc_info=True) + + # End span and detach token after iteration completes + if span and span.is_recording(): + span.end() + if token: + context.detach(token) # type: ignore + + +def scrub_query_params(query_string: str) -> Optional[str]: + """ + Scrub secrets from query string parameters. + + Args: + query_string: The query string to scrub + + Returns: + Scrubbed query string if agent is available, otherwise returns + the original query_string for debugging purposes + """ + if agent is not None: + return strip_secrets_from_query( + query_string, + agent.options.secrets_matcher, # type: ignore + agent.options.secrets_list, # type: ignore + ) + return query_string + + +def set_request_attributes(span: "InstanaSpan", environ: dict[str, Any]) -> None: + """ + Extract and set HTTP attributes from the WSGI environ. + + Args: + span: The active span + environ: WSGI environment dictionary + """ + try: + # Set HTTP method + if "REQUEST_METHOD" in environ: + span.set_attribute(SpanAttributes.HTTP_METHOD, environ["REQUEST_METHOD"]) + + # Set HTTP path + if "PATH_INFO" in environ: + span.set_attribute("http.path", environ["PATH_INFO"]) + + # Set HTTP query parameters (with secrets scrubbed) + if environ.get("QUERY_STRING", "").strip(): + scrubbed_params = scrub_query_params(environ["QUERY_STRING"]) + if scrubbed_params is not None: + span.set_attribute("http.params", scrubbed_params) + + # Set HTTP host + if "HTTP_HOST" in environ: + span.set_attribute( + SpanAttributes.HTTP_HOST, + environ["HTTP_HOST"], + ) + + # Set HTTP URL (without query string to avoid exposing secrets) + if "wsgi.url_scheme" in environ: + scheme = environ["wsgi.url_scheme"] + host = environ.get("HTTP_HOST", "") + script_name = environ.get("SCRIPT_NAME", "") + path = environ.get("PATH_INFO", "") + + url = f"{scheme}://{host}{script_name}{path}" + span.set_attribute(SpanAttributes.HTTP_URL, url) + + except Exception: + logger.debug("Error setting request attributes", exc_info=True) + + +# Made with Bob diff --git a/tests/frameworks/test_werkzeug.py b/tests/frameworks/test_werkzeug.py new file mode 100644 index 00000000..a48f885a --- /dev/null +++ b/tests/frameworks/test_werkzeug.py @@ -0,0 +1,624 @@ +# (c) Copyright IBM Corp. 2026 + +""" +Tests for Werkzeug instrumentation. +""" + +import threading +import time +from typing import Any, Callable, Generator, Optional +from unittest.mock import MagicMock + +import pytest +import requests +from werkzeug.wrappers import Request, Response + +from instana.instrumentation.werkzeug import _TracedWSGIApp +from instana.util.wsgi_utils import ( + normalize_headers as _normalize_headers, + parse_status_code as _parse_status_code, +) +from instana.singletons import get_tracer +from instana.util.ids import hex_id +from tests.helpers import get_first_span_by_filter + + +def simple_wsgi_app(environ: dict[str, Any], start_response: Callable) -> list: + """Simple WSGI app for testing.""" + request = Request(environ) + path = request.path + + if path == "/": + response = Response("Hello World") + elif path == "/error": + response = Response("Internal Server Error", status=500) + elif path == "/exception": + raise RuntimeError("Test exception") + elif path.startswith("/hello/"): + name = path.split("/")[-1] + response = Response(f"Hello, {name}!") + elif path == "/query": + response = Response(f"Query: {request.query_string.decode()}") + else: + response = Response("Not Found", status=404) + + return response(environ, start_response) + + +class TestWerkzeugInstrumentation: + """Tests for Werkzeug autowrapt instrumentation.""" + + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + """SetUp and TearDown""" + self.tracer = get_tracer() + self.recorder = self.tracer.span_processor + self.recorder.clear_spans() + + yield + + self.recorder.clear_spans() + + def _make_request( + self, + app: Callable, + path: str = "/", + method: str = "GET", + headers: Optional[dict[str, str]] = None, + ) -> tuple[str, list, bytes]: + """Helper to make WSGI requests and capture response.""" + environ = { + "REQUEST_METHOD": method, + "PATH_INFO": path, + "QUERY_STRING": "", + "SERVER_NAME": "localhost", + "SERVER_PORT": "80", + "HTTP_HOST": "localhost", + "wsgi.url_scheme": "http", + "wsgi.input": MagicMock(), + "wsgi.errors": MagicMock(), + "wsgi.multithread": False, + "wsgi.multiprocess": True, + "wsgi.run_once": False, + } + + if "?" in path: + path, query = path.split("?", 1) + environ["PATH_INFO"] = path + environ["QUERY_STRING"] = query + + if headers: + for key, value in headers.items(): + environ[f"HTTP_{key.upper().replace('-', '_')}"] = value + + response_data = [] + response_status = [] + response_headers = [] + + def start_response(status: str, headers: list, exc_info=None): + response_status.append(status) + response_headers.extend(headers) + return lambda x: response_data.append(x) + + result = app(environ, start_response) + # Consume the iterable + for chunk in result: + response_data.append(chunk) + + return response_status[0], response_headers, b"".join(response_data) + + def test_traced_wsgi_app_basic_request(self) -> None: + """Test _TracedWSGIApp wrapper with basic request.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + status, _, body = self._make_request(wrapped_app, "/") + + assert status == "200 OK" + assert b"Hello World" in body + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.n == "wsgi" + assert span.data["http"]["method"] == "GET" + assert span.data["http"]["path"] == "/" + assert span.data["http"]["status"] == 200 + assert span.data["http"]["host"] == "localhost" + assert not span.ec + + def test_traced_wsgi_app_with_query_params(self) -> None: + """Test _TracedWSGIApp with query parameters.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + status, _, __ = self._make_request(wrapped_app, "/query?foo=bar&baz=qux") + + assert status == "200 OK" + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.n == "wsgi" + assert span.data["http"]["method"] == "GET" + assert span.data["http"]["path"] == "/query" + assert span.data["http"]["params"] == "foo=bar&baz=qux" + assert span.data["http"]["status"] == 200 + + def test_traced_wsgi_app_secret_scrubbing(self) -> None: + """Test that secrets are scrubbed from query parameters.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + status, _, __ = self._make_request( + wrapped_app, "/query?foo=bar&password=secret123&key=value" + ) + + assert status == "200 OK" + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.n == "wsgi" + assert "password" in span.data["http"]["params"] + assert "secret123" not in span.data["http"]["params"] + assert "" in span.data["http"]["params"] + + def test_traced_wsgi_app_500_error(self) -> None: + """Test 500 error response handling.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + status, _, __ = self._make_request(wrapped_app, "/error") + + assert status == "500 INTERNAL SERVER ERROR" + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.n == "wsgi" + assert span.data["http"]["status"] == 500 + assert span.ec == 1 + + def test_traced_wsgi_app_exception_handling(self) -> None: + """Test exception handling in wrapped app.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + + with pytest.raises(RuntimeError, match="Test exception"): + self._make_request(wrapped_app, "/exception") + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.n == "wsgi" + assert span.ec == 1 + + def test_traced_wsgi_app_404_not_found(self) -> None: + """Test 404 not found response.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + status, _, __ = self._make_request(wrapped_app, "/nonexistent") + + assert status == "404 NOT FOUND" + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.n == "wsgi" + assert span.data["http"]["status"] == 404 + assert not span.ec # 404 is not an error from instrumentation perspective + + def test_traced_wsgi_app_trace_context_propagation(self) -> None: + """Test trace context propagation through headers.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + + with self.tracer.start_as_current_span("test") as parent_span: + span_context = parent_span.get_span_context() + headers = { + "X-INSTANA-T": hex_id(span_context.trace_id), + "X-INSTANA-S": hex_id(span_context.span_id), + } + status, response_headers, _ = self._make_request( + wrapped_app, "/", headers=headers + ) + + assert status == "200 OK" + + # Check response headers contain trace context + header_dict = dict(response_headers) + assert "X-INSTANA-T" in header_dict + assert "X-INSTANA-S" in header_dict + assert "X-INSTANA-L" in header_dict + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + # Find the test span and wsgi span + def span_filter_1(span): + return span.n == "sdk" and span.data["sdk"]["name"] == "test" + + test_span = get_first_span_by_filter(spans, span_filter_1) + assert test_span + + def span_filter_2(span): + return span.n == "wsgi" + + wsgi_span = get_first_span_by_filter(spans, span_filter_2) + assert wsgi_span + + # Verify parent-child relationship + assert test_span.t == wsgi_span.t + assert test_span.s == wsgi_span.p + + # Verify response headers + assert header_dict["X-INSTANA-T"] == hex_id(wsgi_span.t) + assert header_dict["X-INSTANA-S"] == hex_id(wsgi_span.s) + + def test_traced_wsgi_app_post_request(self) -> None: + """Test POST request instrumentation.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + self._make_request(wrapped_app, "/", method="POST") + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.n == "wsgi" + assert span.data["http"]["method"] == "POST" + assert span.data["http"]["path"] == "/" + + def test_traced_wsgi_app_multiple_requests(self) -> None: + """Test multiple sequential requests produce isolated spans.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + + self._make_request(wrapped_app, "/") + self._make_request(wrapped_app, "/hello/World") + self._make_request(wrapped_app, "/hello/Test") + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + # All should be wsgi spans with unique IDs + for span in spans: + assert span.n == "wsgi" + assert not span.ec + assert span.data["http"]["status"] == 200 + + # Each request must produce a unique span and trace + assert spans[0].s != spans[1].s != spans[2].s + assert spans[0].t != spans[1].t != spans[2].t + + def test_traced_wsgi_app_wraps_application(self) -> None: + """Test that _TracedWSGIApp properly wraps an application.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + + assert wrapped_app.app is simple_wsgi_app + assert callable(wrapped_app) + + def test_traced_wsgi_app_preserves_app_behavior(self) -> None: + """Test that wrapped app behaves like original.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + + # Make a request through wrapped app + status, _, body = self._make_request(wrapped_app, "/hello/Test") + + assert status == "200 OK" + assert b"Hello, Test!" in body + + # Verify span was created + spans = self.recorder.queued_spans() + assert len(spans) == 1 + assert spans[0].n == "wsgi" + + def test_run_simple_wrapper_logic(self) -> None: + """Test the wrapping logic in run_simple_with_instana.""" + # Test that the wrapper correctly identifies and wraps the app + + # Test with positional args + args = ("localhost", 5000, simple_wsgi_app) + if len(args) >= 3: + _, __, application = args[0], args[1], args[2] + instrumented_app = _TracedWSGIApp(application) + assert isinstance(instrumented_app, _TracedWSGIApp) + assert instrumented_app.app is simple_wsgi_app + + # Test with kwargs + kwargs = {"application": simple_wsgi_app} + if "application" in kwargs: + application = kwargs["application"] + instrumented_app = _TracedWSGIApp(application) + assert isinstance(instrumented_app, _TracedWSGIApp) + assert instrumented_app.app is simple_wsgi_app + + def test_query_params_without_agent(self) -> None: + """Test query params when agent is None.""" + from instana.util.wsgi_utils import scrub_query_params + from unittest.mock import patch + + # Mock agent as None - should return original query string + with patch("instana.util.wsgi_utils.agent", None): + result = scrub_query_params("foo=bar&password=secret") + assert result == "foo=bar&password=secret" + + def test_traced_wsgi_app_init(self) -> None: + """Test _TracedWSGIApp initialization.""" + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + assert wrapped_app.app is simple_wsgi_app + assert hasattr(wrapped_app, "app") + + def test_traced_wsgi_app_span_creation_failure(self) -> None: + """Test exception handling when span creation fails.""" + from unittest.mock import patch + + wrapped_app = _TracedWSGIApp(simple_wsgi_app) + + # Mock create_span_with_context to raise an exception + with patch( + "instana.instrumentation.werkzeug.create_span_with_context", + side_effect=Exception("Span creation failed"), + ): + status, _, body = self._make_request(wrapped_app, "/") + + # App should still work, falling back to unwrapped behavior + assert status == "200 OK" + assert b"Hello World" in body + + # No spans should be created due to the failure + spans = self.recorder.queued_spans() + assert len(spans) == 0 + + def test_werkzeug_run_simple_integration(self) -> None: + """Integration test: Start actual werkzeug server and verify instrumentation.""" + from werkzeug.serving import run_simple + import socket + + # Find a free port + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + s.listen(1) + port = s.getsockname()[1] + + server_started = threading.Event() + server_error = [] + + def run_server(): + try: + # Signal that server is starting + server_started.set() + # Run werkzeug server (this will be instrumented) + run_simple( + "127.0.0.1", + port, + simple_wsgi_app, + use_reloader=False, + use_debugger=False, + threaded=True, + ) + except Exception as e: + server_error.append(e) + + # Start server in background thread + server_thread = threading.Thread(target=run_server, daemon=True) + server_thread.start() + + # Wait for server to start + server_started.wait(timeout=2) + time.sleep(0.5) # Give server time to bind + + try: + # Make HTTP request to the server + response = requests.get(f"http://127.0.0.1:{port}/", timeout=2) + assert response.status_code == 200 + assert b"Hello World" in response.content + + # Give time for span to be recorded + time.sleep(0.2) + + # Verify span was created + spans = self.recorder.queued_spans() + assert len(spans) >= 1 + + # Find the wsgi span + wsgi_spans = [s for s in spans if s.n == "wsgi"] + assert len(wsgi_spans) >= 1 + + span = wsgi_spans[0] + assert span.data["http"]["method"] == "GET" + assert span.data["http"]["path"] == "/" + assert span.data["http"]["status"] == 200 + assert not span.ec + + finally: + # Server will be stopped when thread exits (daemon thread) + pass + + def test_werkzeug_run_simple_integration_kwargs(self) -> None: + """Integration test: Start werkzeug server with kwargs and verify instrumentation.""" + from werkzeug.serving import run_simple + import socket + + # Find a free port + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + s.listen(1) + port = s.getsockname()[1] + + server_started = threading.Event() + + def run_server(): + try: + server_started.set() + # Run with application as kwarg (tests lines 69-73) + run_simple( + hostname="127.0.0.1", + port=port, + application=simple_wsgi_app, + use_reloader=False, + use_debugger=False, + threaded=True, + ) + except Exception: + pass + + server_thread = threading.Thread(target=run_server, daemon=True) + server_thread.start() + + server_started.wait(timeout=2) + time.sleep(0.5) + + try: + response = requests.get(f"http://127.0.0.1:{port}/", timeout=2) + assert response.status_code == 200 + + time.sleep(0.2) + + spans = self.recorder.queued_spans() + wsgi_spans = [s for s in spans if s.n == "wsgi"] + assert len(wsgi_spans) >= 1 + + span = wsgi_spans[0] + assert span.data["http"]["status"] == 200 + + finally: + pass + + def test_is_flask_app_detection(self) -> None: + """Test _is_flask_app correctly identifies Flask applications.""" + from instana.instrumentation.werkzeug import _is_flask_app + + # Mock a Flask app - the class name should be "Flask" + class Flask: + pass + + # Set the module to simulate flask.app + Flask.__module__ = "flask.app" + + flask_app = Flask() + assert _is_flask_app(flask_app) is True + + def test_is_flask_app_with_wrapped_wsgi_app(self) -> None: + """Test _is_flask_app detects Flask apps wrapped in middleware.""" + from instana.instrumentation.werkzeug import _is_flask_app + + # Mock a Flask app - class name should be "Flask" + class Flask: + pass + + Flask.__module__ = "flask.app" + + # Mock middleware wrapping Flask app + class MockMiddleware: + def __init__(self): + self.wsgi_app = Flask() + + wrapped_app = MockMiddleware() + assert _is_flask_app(wrapped_app) is True + + def test_is_flask_app_non_flask(self) -> None: + """Test _is_flask_app returns False for non-Flask apps.""" + from instana.instrumentation.werkzeug import _is_flask_app + + # Regular WSGI app + assert _is_flask_app(simple_wsgi_app) is False + + # Mock non-Flask app + class MockApp: + __name__ = "NotFlask" + __module__ = "some.module" + + assert _is_flask_app(MockApp()) is False + + def test_run_simple_skips_flask_app_positional_args(self) -> None: + """Test run_simple_with_instana skips Flask apps (positional args).""" + from instana.instrumentation.werkzeug import _TracedWSGIApp, _is_flask_app + from unittest.mock import patch, MagicMock + + # Mock a Flask app - class name should be "Flask" + class Flask: + def __call__(self, environ, start_response): + return simple_wsgi_app(environ, start_response) + + Flask.__module__ = "flask.app" + flask_app = Flask() + + # Verify our mock is detected as Flask + assert _is_flask_app(flask_app), "Flask app not detected" + + # Patch make_server to prevent actual server start but allow instrumentation to run + with patch("werkzeug.serving.make_server") as mock_make_server: + mock_server = MagicMock() + mock_server.serve_forever = MagicMock() + mock_make_server.return_value = mock_server + + from werkzeug.serving import run_simple + + # Call run_simple with Flask app + run_simple( + "localhost", 5000, flask_app, use_reloader=False, use_debugger=False + ) + + # Verify make_server was called with original Flask app (not wrapped) + mock_make_server.assert_called_once() + call_args = mock_make_server.call_args[0] + # Flask app should NOT be wrapped in _TracedWSGIApp + assert call_args[2] is flask_app + assert not isinstance(call_args[2], _TracedWSGIApp) + + def test_run_simple_skips_flask_app_kwargs(self) -> None: + """Test run_simple_with_instana skips Flask apps (kwargs).""" + from instana.instrumentation.werkzeug import _TracedWSGIApp, _is_flask_app + from unittest.mock import patch, MagicMock + + # Mock a Flask app - class name should be "Flask" + class Flask: + def __call__(self, environ, start_response): + return simple_wsgi_app(environ, start_response) + + Flask.__module__ = "flask.app" + flask_app = Flask() + + # Verify our mock is detected as Flask + assert _is_flask_app(flask_app), "Flask app not detected" + + # Patch make_server to prevent actual server start but allow instrumentation to run + with patch("werkzeug.serving.make_server") as mock_make_server: + mock_server = MagicMock() + mock_server.serve_forever = MagicMock() + mock_make_server.return_value = mock_server + + from werkzeug.serving import run_simple + + # Call run_simple with Flask app using kwargs + run_simple( + hostname="localhost", + port=5000, + application=flask_app, + use_reloader=False, + use_debugger=False, + ) + + # Verify make_server was called with original Flask app (not wrapped) + mock_make_server.assert_called_once() + call_args = mock_make_server.call_args[0] + # Flask app should NOT be wrapped in _TracedWSGIApp (3rd positional arg) + assert call_args[2] is flask_app + assert not isinstance(call_args[2], _TracedWSGIApp) + + +def test_parse_status_code_handles_valid_and_invalid_values() -> None: + """Test safe parsing of WSGI status strings.""" + assert _parse_status_code("200 OK") == 200 + assert _parse_status_code("404") == 404 + assert _parse_status_code("") is None + assert _parse_status_code("INVALID") is None + assert _parse_status_code(" OK") is None + assert _parse_status_code(None) is None # type: ignore[arg-type] + + +def test_normalize_headers_converts_non_string_values() -> None: + """Test response header normalization.""" + headers = [("Content-Length", 123), ("Content-Type", "text/plain")] + assert _normalize_headers(headers) == [ + ("Content-Length", "123"), + ("Content-Type", "text/plain"), + ] + + +# Made with Bob diff --git a/tests/instrumentation/__init__.py b/tests/instrumentation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/instrumentation/test_werkzeug.py b/tests/instrumentation/test_werkzeug.py new file mode 100644 index 00000000..dfc30b48 --- /dev/null +++ b/tests/instrumentation/test_werkzeug.py @@ -0,0 +1,154 @@ +# (C) Copyright IBM Corp. 2026 + +""" +Tests for Werkzeug instrumentation. + +Verifies that Flask apps are skipped to avoid double instrumentation. +""" + +import unittest +from unittest.mock import Mock, patch + +from instana.instrumentation.werkzeug import _is_flask_app + + +class TestWerkzeugInstrumentation(unittest.TestCase): + """Test Werkzeug instrumentation behavior.""" + + def test_is_flask_app_detects_flask(self): + """Test that _is_flask_app correctly identifies Flask apps.""" + # Create a mock Flask app + mock_flask_app = Mock() + mock_flask_app.__class__.__name__ = "Flask" + mock_flask_app.__class__.__module__ = "flask.app" + + self.assertTrue(_is_flask_app(mock_flask_app)) + + def test_is_flask_app_detects_wrapped_flask(self): + """Test that _is_flask_app detects Flask apps wrapped in middleware.""" + # Create a mock Flask app + mock_flask_app = Mock() + mock_flask_app.__class__.__name__ = "Flask" + mock_flask_app.__class__.__module__ = "flask.app" + + # Wrap it in middleware + mock_wrapper = Mock() + mock_wrapper.__class__.__name__ = "DispatcherMiddleware" + mock_wrapper.wsgi_app = mock_flask_app + + self.assertTrue(_is_flask_app(mock_wrapper)) + + def test_is_flask_app_rejects_non_flask(self): + """Test that _is_flask_app rejects non-Flask WSGI apps.""" + # Create a mock non-Flask WSGI app + mock_wsgi_app = Mock() + mock_wsgi_app.__class__.__name__ = "Application" + mock_wsgi_app.__class__.__module__ = "myapp" + + self.assertFalse(_is_flask_app(mock_wsgi_app)) + + def test_is_flask_app_handles_none(self): + """Test that _is_flask_app handles None gracefully.""" + self.assertFalse(_is_flask_app(None)) + + def test_is_flask_app_handles_callable(self): + """Test that _is_flask_app handles plain callables.""" + + def simple_wsgi_app(environ, start_response): + return [] + + self.assertFalse(_is_flask_app(simple_wsgi_app)) + + @patch("instana.instrumentation.werkzeug.logger") + def test_is_flask_app_handles_exceptions(self, mock_logger): + """Test that _is_flask_app handles exceptions gracefully.""" + + # Create an object that raises on attribute access + class BrokenApp: + @property + def __class__(self): + raise RuntimeError("Broken!") + + broken_app = BrokenApp() + result = _is_flask_app(broken_app) + + self.assertFalse(result) + mock_logger.debug.assert_called_once() + + +class TestWerkzeugFlaskIntegration(unittest.TestCase): + """Test Werkzeug instrumentation logic with Flask apps.""" + + def test_flask_app_detection_in_args(self): + """Test that Flask apps in args are detected and not wrapped.""" + from instana.instrumentation.werkzeug import _is_flask_app + + # Mock Flask app + mock_flask_app = Mock() + mock_flask_app.__class__.__name__ = "Flask" + mock_flask_app.__class__.__module__ = "flask.app" + + # Verify Flask detection works + self.assertTrue(_is_flask_app(mock_flask_app)) + + def test_non_flask_app_detection(self): + """Test that non-Flask WSGI apps are correctly identified.""" + from instana.instrumentation.werkzeug import _is_flask_app + + # Mock non-Flask WSGI app + mock_wsgi_app = Mock() + mock_wsgi_app.__class__.__name__ = "Application" + mock_wsgi_app.__class__.__module__ = "myapp" + + # Verify non-Flask detection works + self.assertFalse(_is_flask_app(mock_wsgi_app)) + + @patch("instana.instrumentation.werkzeug.logger") + def test_wrapping_logic_skips_flask(self, mock_logger): + """Test the wrapping logic skips Flask apps.""" + from instana.instrumentation.werkzeug import _is_flask_app, _TracedWSGIApp + + # Mock Flask app + mock_flask_app = Mock() + mock_flask_app.__class__.__name__ = "Flask" + mock_flask_app.__class__.__module__ = "flask.app" + + # Simulate the logic in run_simple_with_instana + if _is_flask_app(mock_flask_app): + # Should skip wrapping + wrapped_app = mock_flask_app + else: + # Should wrap + wrapped_app = _TracedWSGIApp(mock_flask_app) + + # Verify Flask app was NOT wrapped + self.assertIs(wrapped_app, mock_flask_app) + self.assertNotIsInstance(wrapped_app, _TracedWSGIApp) + + @patch("instana.instrumentation.werkzeug.logger") + def test_wrapping_logic_wraps_non_flask(self, mock_logger): + """Test the wrapping logic wraps non-Flask apps.""" + from instana.instrumentation.werkzeug import _is_flask_app, _TracedWSGIApp + + # Mock non-Flask WSGI app + mock_wsgi_app = Mock() + mock_wsgi_app.__class__.__name__ = "Application" + mock_wsgi_app.__class__.__module__ = "myapp" + + # Simulate the logic in run_simple_with_instana + if _is_flask_app(mock_wsgi_app): + # Should skip wrapping + wrapped_app = mock_wsgi_app + else: + # Should wrap + wrapped_app = _TracedWSGIApp(mock_wsgi_app) + + # Verify non-Flask app WAS wrapped + self.assertIsNot(wrapped_app, mock_wsgi_app) + self.assertIsInstance(wrapped_app, _TracedWSGIApp) + + +if __name__ == "__main__": + unittest.main() + +# Made with Bob diff --git a/tests/instrumentation/test_wsgi_middleware.py b/tests/instrumentation/test_wsgi_middleware.py new file mode 100644 index 00000000..03101443 --- /dev/null +++ b/tests/instrumentation/test_wsgi_middleware.py @@ -0,0 +1,270 @@ +# (C) Copyright IBM Corp. 2026 + +""" +Unit tests for InstanaWSGIMiddleware class +""" + +import pytest +from typing import Any, Callable, Generator +from unittest.mock import Mock, patch + +from instana.instrumentation.wsgi import InstanaWSGIMiddleware +from instana.singletons import get_tracer + + +class TestInstanaWSGIMiddleware: + """Direct unit tests for InstanaWSGIMiddleware""" + + @pytest.fixture(autouse=True) + def _setup(self) -> Generator[None, None, None]: + """Setup test environment""" + self.tracer = get_tracer() + self.recorder = self.tracer._span_processor # type: ignore + self.recorder.clear_spans() # type: ignore + yield + self.recorder.clear_spans() # type: ignore + + def test_middleware_init(self) -> None: + """Test middleware initialization""" + app = Mock() + middleware = InstanaWSGIMiddleware(app) + assert middleware.app is app + + def test_middleware_call_success(self) -> None: + """Test successful middleware call""" + # Create mock app + app = Mock() + app.return_value = [b"response"] + + # Create middleware + middleware = InstanaWSGIMiddleware(app) + + # Create environ + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/test", + "HTTP_HOST": "localhost:8080", + "wsgi.url_scheme": "http", + } + + # Create start_response + start_response = Mock() + + # Call middleware + result = middleware(environ, start_response) + + # Consume the generator + _ = list(result) # type: ignore + + # Verify app was called + assert app.called + spans = self.recorder.queued_spans() # type: ignore + assert len(spans) == 1 + assert spans[0].n == "wsgi" + + def test_middleware_call_with_exception_in_span_creation(self) -> None: + """Test middleware when span creation fails""" + app = Mock() + app.return_value = [b"response"] + + middleware = InstanaWSGIMiddleware(app) + + environ = {"REQUEST_METHOD": "GET"} + start_response = Mock() + + # Mock create_span_with_context to raise exception + with patch( + "instana.instrumentation.wsgi.create_span_with_context", + side_effect=Exception("Span creation failed"), + ): + result = middleware(environ, start_response) + + # Should return app result directly + assert result == app.return_value + # App should be called with original start_response + app.assert_called_once_with(environ, start_response) + + def test_middleware_call_with_exception_in_app(self) -> None: + """Test middleware when app raises exception""" + app = Mock() + app.side_effect = ValueError("App error") + + middleware = InstanaWSGIMiddleware(app) + + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/test", + "HTTP_HOST": "localhost:8080", + "wsgi.url_scheme": "http", + } + start_response = Mock() + + # Call middleware and expect exception + with pytest.raises(ValueError, match="App error"): + middleware(environ, start_response) + + # Verify span was recorded with exception + spans = self.recorder.queued_spans() # type: ignore + assert len(spans) == 1 + span = spans[0] + assert span.n == "wsgi" + # Exception should be recorded (ec is error count) + assert span.ec == 1 + + def test_middleware_call_with_exception_in_app_no_span(self) -> None: + """Test middleware when app raises exception and span is None""" + app = Mock() + app.side_effect = RuntimeError("App runtime error") + + middleware = InstanaWSGIMiddleware(app) + + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/test", + "HTTP_HOST": "localhost:8080", + "wsgi.url_scheme": "http", + } + start_response = Mock() + + # Mock create_span_with_context to return None span + with ( + patch( + "instana.instrumentation.wsgi.create_span_with_context", + return_value=(None, None), + ), + pytest.raises(RuntimeError, match="App runtime error"), + ): + middleware(environ, start_response) + + def test_middleware_call_with_exception_span_not_recording(self) -> None: + """Test middleware when app raises exception and span is not recording""" + app = Mock() + app.side_effect = KeyError("Key not found") + + middleware = InstanaWSGIMiddleware(app) + + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/test", + "HTTP_HOST": "localhost:8080", + "wsgi.url_scheme": "http", + } + start_response = Mock() + + # Mock span that is not recording + mock_span = Mock() + mock_span.is_recording.return_value = False + mock_token = Mock() + + with patch( + "instana.instrumentation.wsgi.create_span_with_context", + return_value=(mock_span, mock_token), + ): + with pytest.raises(KeyError, match="Key not found"): + middleware(environ, start_response) + + # Verify span.end() was not called since not recording + mock_span.end.assert_not_called() + + def test_middleware_call_with_token_detach(self) -> None: + """Test middleware properly detaches context token on exception""" + app = Mock() + app.side_effect = TypeError("Type error") + + middleware = InstanaWSGIMiddleware(app) + + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/test", + "HTTP_HOST": "localhost:8080", + "wsgi.url_scheme": "http", + } + start_response = Mock() + + mock_span = Mock() + mock_span.is_recording.return_value = True + mock_token = Mock() + + with ( + patch( + "instana.instrumentation.wsgi.create_span_with_context", + return_value=(mock_span, mock_token), + ), + patch("instana.instrumentation.wsgi.context") as mock_context, + pytest.raises(TypeError, match="Type error"), + ): + middleware(environ, start_response) + + # Verify context.detach was called + mock_context.detach.assert_called_once_with(mock_token) + + def test_middleware_call_with_no_token(self) -> None: + """Test middleware when token is None""" + app = Mock() + app.side_effect = AttributeError("Attribute error") + + middleware = InstanaWSGIMiddleware(app) + + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/test", + "HTTP_HOST": "localhost:8080", + "wsgi.url_scheme": "http", + } + start_response = Mock() + + mock_span = Mock() + mock_span.is_recording.return_value = True + + with ( + patch( + "instana.instrumentation.wsgi.create_span_with_context", + return_value=(mock_span, None), + ), + patch("instana.instrumentation.wsgi.context") as mock_context, + pytest.raises(AttributeError, match="Attribute error"), + ): + middleware(environ, start_response) + + # Verify context.detach was not called since token is None + mock_context.detach.assert_not_called() + + def test_middleware_integration_with_iterable(self) -> None: + """Test middleware with iterable response""" + + def app(environ: dict[str, Any], start_response: Callable) -> list[bytes]: + start_response("200 OK", [("Content-Type", "text/plain")]) + return [b"Hello", b" ", b"World"] + + middleware = InstanaWSGIMiddleware(app) + + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/test", + "HTTP_HOST": "localhost:8080", + "wsgi.url_scheme": "http", + } + + start_response_called = [] + + def start_response( + status: str, headers: list[tuple[str, str]], exc_info: Any = None + ) -> None: + start_response_called.append((status, headers)) + + result = middleware(environ, start_response) + + # Consume the generator + response_data = b"".join(result) # type: ignore + + assert response_data == b"Hello World" + assert len(start_response_called) == 1 + assert start_response_called[0][0] == "200 OK" + + # Verify span was created + spans = self.recorder.queued_spans() # type: ignore + assert len(spans) == 1 + assert spans[0].n == "wsgi" + + +# Made with Bob diff --git a/tests/util/__init__.py b/tests/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/util/test_wsgi_utils.py b/tests/util/test_wsgi_utils.py new file mode 100644 index 00000000..43b6d7d7 --- /dev/null +++ b/tests/util/test_wsgi_utils.py @@ -0,0 +1,309 @@ +# (C) Copyright IBM Corp. 2026 + +""" +Unit tests for WSGI utility functions +""" + +import pytest +from typing import Generator +from unittest.mock import Mock, patch + +from instana.util.wsgi_utils import ( + build_start_response, + create_span_with_context, + end_span_after_iterating, + normalize_headers, + parse_status_code, + scrub_query_params, + set_request_attributes, +) +from instana.singletons import get_tracer + + +class TestWSGIUtils: + """Tests for WSGI utility functions""" + + @pytest.fixture(autouse=True) + def _setup(self) -> Generator[None, None, None]: + """Setup test environment""" + self.tracer = get_tracer() + self.recorder = self.tracer._span_processor # type: ignore + self.recorder.clear_spans() # type: ignore + yield + self.recorder.clear_spans() # type: ignore + + def test_parse_status_code_valid(self) -> None: + """Test parsing valid status codes""" + assert parse_status_code("200 OK") == 200 + assert parse_status_code("404 Not Found") == 404 + assert parse_status_code("500 Internal Server Error") == 500 + assert parse_status_code("301") == 301 + + def test_parse_status_code_invalid(self) -> None: + """Test parsing invalid status codes""" + # AttributeError - None has no split + assert parse_status_code(None) is None # type: ignore + + # IndexError - empty string + assert parse_status_code("") is None + + # ValueError - non-numeric + assert parse_status_code("OK 200") is None + + # TypeError - wrong type + assert parse_status_code(200) is None # type: ignore + + def test_normalize_headers_all_strings(self) -> None: + """Test normalizing headers when all values are strings""" + headers = [("Content-Type", "text/html"), ("X-Custom", "value")] + result = normalize_headers(headers) + assert result == headers + + def test_normalize_headers_mixed_types(self) -> None: + """Test normalizing headers with non-string values""" + headers = [ + ("Content-Length", 1234), + ("X-Count", 42), + ("Content-Type", "text/html"), + ] + result = normalize_headers(headers) + assert result == [ + ("Content-Length", "1234"), + ("X-Count", "42"), + ("Content-Type", "text/html"), + ] + + def test_build_start_response_with_500_error(self) -> None: + """Test start_response wrapper marks span as errored for 5xx status""" + span = self.tracer.start_span("test") + original_start_response = Mock() + + wrapped = build_start_response(span, original_start_response) + headers = [("Content-Type", "text/html")] + + wrapped("500 Internal Server Error", headers) + + # Verify span was marked as errored + assert span.attributes.get("ec") == 1 + span.end() + + def test_build_start_response_exception_handling(self) -> None: + """Test start_response wrapper handles exceptions gracefully""" + span = Mock() + span.context = Mock() + + # Make tracer.inject raise an exception + original_start_response = Mock() + + with patch("instana.util.wsgi_utils.get_tracer") as mock_tracer: + mock_tracer.return_value.inject.side_effect = RuntimeError("Inject failed") + + wrapped = build_start_response(span, original_start_response) + headers = [("Content-Type", "text/html")] + + # Should not raise, should call original start_response + _ = wrapped("200 OK", headers, None) + + # Original start_response should be called with original headers + original_start_response.assert_called_once_with("200 OK", headers, None) + + def test_end_span_after_iterating_with_close(self) -> None: + """Test end_span_after_iterating calls close on iterable""" + span = self.tracer.start_span("test") + _ = self.tracer._span_processor # type: ignore + token = Mock() + + # Create iterable with close method + class CloseableIterable: + def __init__(self): + self.closed = False + + def __iter__(self): + return self + + def __next__(self): + raise StopIteration + + def close(self): + self.closed = True + + iterable = CloseableIterable() + + # Consume the generator + list(end_span_after_iterating(iterable, span, token)) + + # Verify close was called + assert iterable.closed + + def test_end_span_after_iterating_close_exception(self) -> None: + """Test end_span_after_iterating handles close exceptions""" + span = self.tracer.start_span("test") + token = Mock() + + # Create iterable with close that raises + class BadCloseIterable: + def __iter__(self): + return self + + def __next__(self): + raise StopIteration + + def close(self): + raise RuntimeError("Close failed") + + iterable = BadCloseIterable() + + # Should not raise, should handle exception gracefully + list(end_span_after_iterating(iterable, span, token)) + + def test_scrub_query_params_with_agent(self) -> None: + """Test query param scrubbing when agent is available""" + query = "key=value&secret=password123" + result = scrub_query_params(query) + + # Should scrub secrets + assert ( + "secret=" in result or "secret" not in result or result == query + ) + + def test_scrub_query_params_no_agent(self) -> None: + """Test query param scrubbing when agent is None""" + query = "key=value&secret=password123" + + with patch("instana.util.wsgi_utils.agent", None): + result = scrub_query_params(query) + # Should return original when agent is None + assert result == query + + def test_set_request_attributes_with_query_string(self) -> None: + """Test setting request attributes with query string""" + span = self.tracer.start_span("test") + + environ = { + "REQUEST_METHOD": "POST", + "PATH_INFO": "/api/users", + "QUERY_STRING": "id=123&secret=hidden", + "HTTP_HOST": "example.com:8080", + "wsgi.url_scheme": "https", + "SCRIPT_NAME": "/app", + } + + set_request_attributes(span, environ) + + # Verify attributes were set + assert span.attributes.get("http.method") == "POST" + assert span.attributes.get("http.path") == "/api/users" + assert span.attributes.get("http.host") == "example.com:8080" + assert "http.params" in span.attributes + assert ( + span.attributes.get("http.url") == "https://example.com:8080/app/api/users" + ) + + span.end() + + def test_set_request_attributes_empty_query_string(self) -> None: + """Test setting request attributes with empty query string""" + span = self.tracer.start_span("test") + + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/", + "QUERY_STRING": "", + "HTTP_HOST": "localhost", + "wsgi.url_scheme": "http", + } + + set_request_attributes(span, environ) + + # Verify query params not set for empty string + assert "http.params" not in span.attributes + + span.end() + + def test_set_request_attributes_whitespace_query(self) -> None: + """Test setting request attributes with whitespace-only query string""" + span = self.tracer.start_span("test") + + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/", + "QUERY_STRING": " ", + "HTTP_HOST": "localhost", + "wsgi.url_scheme": "http", + } + + set_request_attributes(span, environ) + + # Verify query params not set for whitespace + assert "http.params" not in span.attributes + + span.end() + + def test_set_request_attributes_exception_handling(self) -> None: + """Test set_request_attributes handles exceptions gracefully""" + span = Mock() + span.set_attribute = Mock(side_effect=RuntimeError("Attribute error")) + + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/test", + } + + # Should not raise exception + set_request_attributes(span, environ) + + def test_create_span_with_context(self) -> None: + """Test creating span with context""" + environ = { + "REQUEST_METHOD": "GET", + "PATH_INFO": "/test", + "HTTP_HOST": "localhost:8080", + "wsgi.url_scheme": "http", + } + + span, token = create_span_with_context(environ) + + assert span is not None + assert span.name == "wsgi" + assert token is not None + + # Clean up + span.end() + from opentelemetry import context + + context.detach(token) + + def test_build_start_response_status_as_string(self) -> None: + """Test build_start_response with status_as_string=True""" + span = self.tracer.start_span("test") + original_start_response = Mock() + + wrapped = build_start_response( + span, original_start_response, status_as_string=True + ) + headers = [("Content-Type", "text/html")] + + wrapped("200 OK", headers) + + # Verify status was set as string + assert span.attributes.get("http.status_code") == "200" + span.end() + + def test_build_start_response_status_as_int(self) -> None: + """Test build_start_response with status_as_string=False""" + span = self.tracer.start_span("test") + original_start_response = Mock() + + wrapped = build_start_response( + span, original_start_response, status_as_string=False + ) + headers = [("Content-Type", "text/html")] + + wrapped("404 Not Found", headers) + + # Verify status was set as int + assert span.attributes.get("http.status_code") == 404 + span.end() + + +# Made with Bob