metric stream

class str, *, agg: Union[Callable[[Iterable[Union[int, float]]], Union[int, float]], Sequence[Callable[[Iterable[Union[int, float]]], Union[int, float]]], Mapping[str, Callable[[Iterable[Union[int, float]]], Union[int, float]]]], default_tags: Optional[Dict] = None, window_size: Optional[int] = None, batch_size: Optional[int] = None)[source]

A stream of MElement s that groups elements into batches of fixed time or number, further groups batches by distinct assigned tags, then aggregates each group’s values by one or multiple functions.

To do any useful work, metric streams must be connected to a MSink, which operates on elements in a visible / persistent way. In typical usage, you’ll want to connect multiple streams to multiple sinks using a centralized coordinator: MCoordinator.

>>> from metrix import MElement, MStream
>>> eles = [{"value": 1}, {"value": 2}, {"value": 1, "tags": {"foo": "bar"}}]
>>> mstream = MStream("m", agg=sum, default_tags={"foo": "BAR!"}, window_size=1)
>>> # HACK! we'll add a sink directly so we can see what happens
>>> for ele in eles:
...     mstream.send(**ele)
MElement(name=m.sum, value=3, tags={'foo': 'BAR!'})
MElement(name=m.sum, value=1, tags={'foo': 'bar'})
  • name – Name of the metric whose elements are sent into this stream.

  • agg – One or multiple aggregation functions to be applied to groups of metric elements’ values in order to produce new, aggregated metric elements. This may be specified as a single callable or a sequence of callables, in which case the corresponding components of the are named after the functions themselves; this may also be specified as a mapping of component name to callable, in which case the user-specified names are used instead.

  • default_tags – Optional set of tags to apply to all metric elements by default. Tags specified on individual elements override and append to this def

  • window_size – Size of tumbling window in seconds with which to group elements. For example: If window_size=10, all elements sent into the stream within a given 10-second window will be grouped together before their values are aggregated, as specified by agg.

  • batch_size – Size of batches in number of elements with which to group elements. For example: If batch_size=10, every 10 successive elements sent into the stream will be grouped together before their values are aggregated, as specified by agg. Note that setting batch_size=1 will effectively skip grouping, in which case aggregating values doesn’t make sense, either.


You must set either window_size or batch_size when initializing a metric stream. No default is set because it depends entirely on context: the rate with which metric elements are sent into the stream, the desired resolution on aggregated metrics, and any rate limit requirements on connected metric sinks. This is the only stream attribute that demands deliberate thought. Choose wisely! :)


Entry point to the metric stream.


Data processing stream to which metric elements are sent.

send(value: Union[int, float], *, tags: Optional[Dict] = None)None[source]

Send a given metric value to the stream; optionally, pass metric-specific tags to add new and overwrite existing default tags associated with the stream.

  • value – Numeric metric value.

  • tags – Optional tags to associate with this specific metric value.

timer(scale: int = 1, *, tags: Optional[Dict] = None)[source]

Context manager that measures the elapsed time spent running statements enclosed by the with statement, and sends that time to the stream.

  • scale – Multiplier applied to the elapsed time value, in seconds by default. For example, to report time in milliseconds, use scale=1000.

  • tags – Optional tags to associate with this specific timer value.

See also

MStream.send() Sequence[metrix.element.MElement])bool[source]

Return True if group contains any metric elements, and False otherwise; used to filter out empty group from a metric stream.