Source code for

from contextlib import contextmanager
from functools import partial
from operator import attrgetter
from timeit import default_timer
from typing import (

import streamz
from toolz import itertoolz

import metrix.element
from metrix import MElement

Number = Union[int, float]
AggFunc = Callable[[Iterable[Number]], Number]
MetricAggFunc = Callable[[Dict[Optional[str], Iterable[MElement]]], Tuple[MElement]]

[docs]class MStream: """ A stream of :class:`MElement <metrix.element.MElement>` s that groups elements into batches of fixed time or number, further groups batches by distinct assigned tags, then aggregates each group's values by one or multiple functions. To do any useful work, metric streams must be connected to a :class:`MSink <metrix.sinks.MSink>`, which operates on elements in a visible / persistent way. In typical usage, you'll want to connect multiple streams to multiple sinks using a centralized coordinator: :class:`MCoordinator <metrix.coordinator.MCoordinator>`. .. code-block:: pycon >>> from metrix import MElement, MStream >>> eles = [{"value": 1}, {"value": 2}, {"value": 1, "tags": {"foo": "bar"}}] >>> mstream = MStream("m", agg=sum, default_tags={"foo": "BAR!"}, window_size=1) >>> # HACK! we'll add a sink directly so we can see what happens >>> >>> for ele in eles: ... mstream.send(**ele) MElement(name=m.sum, value=3, tags={'foo': 'BAR!'}) MElement(name=m.sum, value=1, tags={'foo': 'bar'}) Args: name: Name of the metric whose elements are sent into this stream. agg: One or multiple aggregation functions to be applied to groups of metric elements' values in order to produce new, aggregated metric elements. This may be specified as a single callable or a sequence of callables, in which case the corresponding components of the :attr:`` are named after the functions themselves; this may also be specified as a mapping of component name to callable, in which case the user-specified names are used instead. default_tags: Optional set of tags to apply to all metric elements by default. Tags specified on individual elements override and append to this def window_size: Size of tumbling window in *seconds* with which to group elements. For example: If ``window_size=10``, all elements sent into the stream within a given 10-second window will be grouped together before their values are aggregated, as specified by ``agg``. batch_size: Size of batches in *number of elements* with which to group elements. For example: If ``batch_size=10``, every 10 successive elements sent into the stream will be grouped together before their values are aggregated, as specified by ``agg``. Note that setting ``batch_size=1`` will effectively skip grouping, in which case aggregating values doesn't make sense, either. Note: You *must* set either ``window_size`` or ``batch_size`` when initializing a metric stream. No default is set because it depends entirely on context: the rate with which metric elements are sent into the stream, the desired resolution on aggregated metrics, and any rate limit requirements on connected metric sinks. This is the only stream attribute that demands deliberate thought. Choose wisely! :) Attributes: name agg default_tags window_size batch_size source: Entry point to the metric stream. stream: Data processing stream to which metric elements are sent. """ source: streamz.Source stream: streamz.Stream def __init__( self, name: str, *, agg: Union[AggFunc, Sequence[AggFunc], Mapping[str, AggFunc]], default_tags: Optional[Dict] = None, window_size: Optional[int] = None, batch_size: Optional[int] = None, ): self._validate_sizes(window_size, batch_size) = name self.agg = agg self.default_tags = default_tags self.window_size = window_size self.batch_size = batch_size self._build_stream() def __str__(self): return f"MStream(name='{}', agg={self.agg})" def _validate_sizes(self, window_size: Optional[int], batch_size: Optional[int]): if not bool(window_size) ^ bool(batch_size): raise ValueError( "either window_size or batch_size must be specified in order to group " "metric elements prior to value aggregation -- but not neither, not both" ) if ( (isinstance(window_size, int) and window_size < 0) or (isinstance(batch_size, int) and batch_size < 0) ): raise ValueError( "if specified, window_size and batch_size must be non-negative integers" ) def _build_stream(self) -> None: """ Build a stream based on user-specified attributes, including a source entry point, a fixed-time or -size grouper, an additional grouper by distinct tag set, and one or multiple value aggregators. Does not include any sinks! """ metric_aggs = self._process_agg(self.agg) # build stream source and base stream that groups metric elements together self.source = streamz.Source( if self.window_size is not None: base_stream = ( # entry point for the stream self.source # batch elements in tumbling windows of `window_size` seconds .timed_window(self.window_size, stream_name="group_by_time") ) elif self.batch_size is not None: base_stream = ( # entry point for the stream self.source # batch elements into equal-size batches of `batch_size` elements apiece .partition(self.batch_size, stream_name="group_by_num") ) else: raise ValueError("this shouldn't ever happen -- developer error?") grped_stream = ( base_stream # filter out groups without any elements .filter(group_has_elements) # further group group elements by their `key`, which effectively creates # distinct metrics for each combination of (name, tags) .map( partial(itertoolz.groupby, attrgetter("key")), stream_name="group_by_key", ) ) # if only one agg, we can build a simple stream if len(metric_aggs) == 1: metric_aggname, metric_aggfunc = metric_aggs[0] = ( # start from a mapping of key to window elements grped_stream # aggregate the values of each key group's elements and # output a sequence of aggregated elements (one per group) .map(metric_aggfunc, stream_name=metric_aggname) # flatten sequence of agg elements, so each is its own item in stream .flatten(stream_name="flatten_groups") ) # otherwise, we'll need to branch and double-flatten else: # for each agggretor, create a new branch off the grouped stream # that aggregates each group's elements, as in the simple stream case self._agg_streams = tuple(, stream_name=metric_aggname) for metric_aggname, metric_aggfunc in metric_aggs ) # zip each agg stream's outputs together, to keep aggregated results in sync = (*self._agg_streams, stream_name="zip_aggs") # flatten out the zipped aggs .flatten(stream_name="flatten_aggs") # then flatten out the agg elements for each key group .flatten(stream_name="flatten_groups") ) def _process_agg( self, agg: Union[AggFunc, Sequence[AggFunc], Mapping[str, AggFunc]] ) -> List[Tuple[str, MetricAggFunc]]: """ Process user-provided ``agg`` into a standard form: a sequence of (name, func) pairs that aggregate groups of metric element values. """ if isinstance(agg, Mapping): metric_aggs = [self._make_metric_agg(key, val) for key, val in agg.items()] elif isinstance(agg, Sequence) and not isinstance(agg, str): metric_aggs = [self._make_metric_agg(item.__name__, item) for item in agg] elif isinstance(agg, Callable): metric_aggs = [self._make_metric_agg(agg.__name__, agg)] else: raise TypeError( f"agg={agg} is invalid; " "must be of type Union[AggFunc, Sequence[AggFunc], Dict[str, AggFunc]]" ) return metric_aggs def _make_metric_agg( self, aggname: str, aggfunc: AggFunc ) -> Tuple[str, MetricAggFunc]: """ From a user-provided aggregator, make a "metric aggregator": a (name, func) pair that aggregates groups of metric element values. """ def metric_aggfunc( name: str, aggfunc: AggFunc, grped_mes: Dict[Optional[str], Iterable[MElement]], ) -> Tuple[MElement]: return tuple( MElement( name=name, value=aggfunc(me.value for me in grp_mes), tags=metrix.element.tags_from_key(grp_key), ) for grp_key, grp_mes in grped_mes.items() ) metric_aggname = f"{}.{aggname}" return (metric_aggname, partial(metric_aggfunc, metric_aggname, aggfunc))
[docs] def send(self, value: Number, *, tags: Optional[Dict] = None) -> None: """ Send a given metric value to the stream; optionally, pass metric-specific tags to add new and overwrite existing default tags associated with the stream. Args: value: Numeric metric value. tags: Optional tags to associate with this specific metric ``value``. """ if self.default_tags and tags: tags = {**self.default_tags, **tags} else: tags = tags or self.default_tags me = MElement(, value, tags=tags) self.source.emit(me)
[docs] @contextmanager def timer(self, scale: int = 1, *, tags: Optional[Dict] = None): """ Context manager that measures the elapsed time spent running statements enclosed by the ``with`` statement, and sends that time to the stream. Args: scale: Multiplier applied to the elapsed time value, in seconds by default. For example, to report time in milliseconds, use ``scale=1000``. tags: Optional tags to associate with this specific timer value. See Also: :meth:`MStream.send()` """ start = default_timer() try: yield finally: end = default_timer() self.send((end - start) * scale, tags=tags)
[docs]def group_has_elements(group: Sequence[MElement]) -> bool: """ Return True if ``group`` contains any metric elements, and False otherwise; used to filter out empty group from a metric stream. """ return bool(group)