Source code for nameko_prometheus.dependencies

import logging
import os
import platform
import time
from dataclasses import dataclass
from typing import Any, MutableMapping, Optional, Tuple, Union
from weakref import WeakKeyDictionary

try:
    from functools import singledispatchmethod
except ImportError:
    from singledispatchmethod import singledispatchmethod  # type: ignore

from nameko.containers import WorkerContext
from nameko.events import EventHandler
from nameko.extensions import DependencyProvider, Entrypoint
from nameko.messaging import Consumer
from nameko.rpc import Rpc
from nameko.timer import Timer
from nameko.web.handlers import HttpRequestHandler
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.exposition import choose_encoder
from prometheus_client.registry import REGISTRY, CollectorRegistry, RestrictedRegistry
from werkzeug.wrappers import Request, Response

logger = logging.getLogger(__name__)


START_TIME = time.time()


[docs]class MetricsServer: """ Serves metrics in a format readable by Prometheus scraper. Call :meth:`~expose_metrics()` from a service method decorated with `@http`_ entrypoint to present metrics to Prometheus over HTTP. .. _@http: https://nameko.readthedocs.io/en/stable/built_in_extensions.html#http """
[docs] def expose_metrics(self, request: Request) -> Response: """ Returns metrics as a HTTP response in Prometheus text format. """ registry: Union[CollectorRegistry, RestrictedRegistry] if "name" not in request.args: logger.debug( "Registry name(s) not found in query string, using global registry" ) registry = REGISTRY else: names = request.args.getlist("name") registry = REGISTRY.restricted_registry(names) encoder, content_type = choose_encoder(request.headers["Accept"]) try: # note: registry may be an instance of RestrictedRegistry , but # the only thing encoder does, is call registry.collect(); perhaps # a Protocol-based type would be more suitable here? output = encoder(registry) # type: ignore return Response(output, status=200, content_type=content_type) except Exception: message = "Failed to generate metrics" logger.exception(message) return Response(message, status=500)
[docs]@dataclass(frozen=True) class WorkerSummary: """ Represents the final result (or error) of a worker, including duration. This simplifies method signatures for singledispatch overrides. """ duration: float result: Any exc_info: Optional[Tuple]
[docs]class PrometheusMetrics(DependencyProvider): """ Dependency provider which measures RPC, event handler and HTTP endpoint latency. On service start, a few default metrics are declared. These are: - ``<prefix>_http_requests_total`` - ``<prefix>_http_request_latency_seconds`` - ``<prefix>_rpc_requests_total`` - ``<prefix>_rpc_request_latency_seconds`` - ``<prefix>_events_total`` - ``<prefix>_events_latency_seconds`` - ``<prefix>_timer_total`` - ``<prefix>_timer_latency_seconds`` - ``<prefix>_consumer_total`` - ``<prefix>_consumer_latency_seconds`` where ``prefix`` is either derived from ``name`` attribute of the service class, or :ref:`configured manually <configuration>`. """ def __init__(self): self.worker_starts: MutableMapping[WorkerContext, float] = WeakKeyDictionary()
[docs] def setup(self) -> None: """ Configures the dependency provider and declares default metrics. """ # read config from container, use service name as default prefix service_name = self.container.service_name config = self.container.config.get("PROMETHEUS", {}) service_config = config.get(service_name, {}) prefix = service_config.get("prefix", service_name) # read application version from an environment variable app_version_key = config.get("APP_VERSION_KEY", "APP_VERSION") self.app_version = os.environ.get(app_version_key, "unknown") self.python_version = platform.python_version() # initialize default metrics exposed for every service self.service_info = Gauge( f"{prefix}_service_info", "Always 1; see https://www.robustperception.io/exposing-the-software-version-to-prometheus", ["service_version", "python_version"], ) self.service_uptime_seconds = Gauge( f"{prefix}_service_uptime_seconds", "Uptime of service in seconds", ) self.service_max_workers = Gauge( f"{prefix}_service_max_workers", "Maximum number of available nameko workers", ) self.service_running_workers = Gauge( f"{prefix}_service_running_workers", "Number of currently running nameko workers", ) self.http_request_total_counter = Counter( f"{prefix}_http_requests_total", "Total number of HTTP requests", ["http_method", "endpoint", "status_code"], ) self.http_request_latency_histogram = Histogram( f"{prefix}_http_request_latency_seconds", "HTTP request duration in seconds", ["http_method", "endpoint", "status_code"], ) self.rpc_request_total_counter = Counter( f"{prefix}_rpc_requests_total", "Total number of RPC requests", ["method_name"], ) self.rpc_request_latency_histogram = Histogram( f"{prefix}_rpc_request_latency_seconds", "RPC request duration in seconds", ["method_name"], ) self.events_total_counter = Counter( f"{prefix}_events_total", "Total number of handled events", ["source_service", "event_type"], ) self.events_latency_histogram = Histogram( f"{prefix}_events_latency_seconds", "Event handler duration in seconds", ["source_service", "event_type"], ) self.timer_request_total_counter = Counter( f"{prefix}_timer_requests_total", "Total number of timer_requests", ["method_name"], ) self.timer_request_latency_histogram = Histogram( f"{prefix}_timer_request_latency_seconds", "Timer request duration in seconds", ["method_name"], ) self.consumer_request_total_counter = Counter( f"{prefix}_consumer_requests_total", "Total number of consumer requests", ["method_name"], ) self.consumer_request_latency_histogram = Histogram( f"{prefix}_consumer_request_latency_seconds", "Consumer request duration in seconds", ["method_name"], )
[docs] def get_dependency(self, worker_ctx: WorkerContext) -> MetricsServer: """ Returns an instance of :class:`~nameko_prometheus.dependencies.MetricsServer` to be injected into the worker. """ return MetricsServer()
[docs] def worker_setup(self, worker_ctx: WorkerContext) -> None: """ Called before service worker starts. """ self.worker_starts[worker_ctx] = time.perf_counter()
[docs] def worker_result( self, worker_ctx: WorkerContext, result=None, exc_info=None ) -> None: """ Called after service worker completes. At this point the default metrics such as worker latency are observed, regardless of whether the worker finished successfully or raised an exception. """ try: start = self.worker_starts.pop(worker_ctx) except KeyError: logger.warning("No worker_ctx in request start dictionary") return worker_summary = WorkerSummary( duration=time.perf_counter() - start, result=result, exc_info=exc_info, ) self.observe_entrypoint(worker_ctx.entrypoint, worker_summary) self.observe_state_metrics()
@singledispatchmethod def observe_entrypoint( self, entrypoint: Entrypoint, worker_summary: WorkerSummary ) -> None: logger.warning(f"Entrypoint {entrypoint} is not traceable by nameko_prometheus") @observe_entrypoint.register(Rpc) def _observe_rpc(self, entrypoint: Rpc, worker_summary: WorkerSummary) -> None: logger.debug(f"Collect metrics from RPC entrypoint {entrypoint}") method_name = entrypoint.method_name self.rpc_request_total_counter.labels(method_name=method_name).inc() self.rpc_request_latency_histogram.labels(method_name=method_name).observe( worker_summary.duration ) @observe_entrypoint.register(HttpRequestHandler) def _observe_http( self, entrypoint: HttpRequestHandler, worker_summary: WorkerSummary ) -> None: logger.debug(f"Collect metrics from HTTP entrypoint {entrypoint}") http_method = entrypoint.method url = entrypoint.url if worker_summary.exc_info: _, exc, _ = worker_summary.exc_info status_code = entrypoint.response_from_exception(exc).status_code else: status_code = entrypoint.response_from_result( worker_summary.result ).status_code logger.debug(f"Tracing HTTP request: {http_method} {url} {status_code}") self.http_request_total_counter.labels( http_method=http_method, endpoint=url, status_code=status_code ).inc() self.http_request_latency_histogram.labels( http_method=http_method, endpoint=url, status_code=status_code ).observe(worker_summary.duration) @observe_entrypoint.register(EventHandler) def _observe_event_handler( self, entrypoint: EventHandler, worker_summary: WorkerSummary ) -> None: logger.debug(f"Collect metrics from event handler entrypoint {entrypoint}") source_service = entrypoint.source_service event_type = entrypoint.event_type self.events_total_counter.labels( source_service=source_service, event_type=event_type ).inc() self.events_latency_histogram.labels( source_service=source_service, event_type=event_type ).observe(worker_summary.duration) @observe_entrypoint.register(Timer) def _observe_timer(self, entrypoint: Timer, worker_summary: WorkerSummary) -> None: logger.debug(f"Collect metrics from Timer entrypoint {entrypoint}") method_name = entrypoint.method_name self.timer_request_total_counter.labels(method_name=method_name).inc() self.timer_request_latency_histogram.labels(method_name=method_name).observe( worker_summary.duration ) @observe_entrypoint.register(Consumer) def _observe_consumer( self, entrypoint: Consumer, worker_summary: WorkerSummary ) -> None: logger.debug(f"Collect metrics from Consume entrypoint {entrypoint}") method_name = entrypoint.method_name self.consumer_request_total_counter.labels(method_name=method_name).inc() self.consumer_request_latency_histogram.labels(method_name=method_name).observe( worker_summary.duration ) def observe_state_metrics(self) -> None: self.service_info.labels( service_version=self.app_version, python_version=self.python_version ).set(1) self.service_uptime_seconds.set(time.time() - START_TIME) self.service_max_workers.set(self.container.max_workers) self.service_running_workers.set(len(self.container._worker_threads))