metric stream¶
-
class
metrix.stream.
MStream
(name: str, *, agg: Union[Callable[[Iterable[Union[int, float]]], Union[int, float]], Sequence[Callable[[Iterable[Union[int, float]]], Union[int, float]]], Mapping[str, Callable[[Iterable[Union[int, float]]], Union[int, float]]]], default_tags: Optional[Dict] = None, window_size: Optional[int] = None, batch_size: Optional[int] = None)[source]¶ A stream of
MElement
s that groups elements into batches of fixed time or number, further groups batches by distinct assigned tags, then aggregates each group’s values by one or multiple functions.To do any useful work, metric streams must be connected to a
MSink
, which operates on elements in a visible / persistent way. In typical usage, you’ll want to connect multiple streams to multiple sinks using a centralized coordinator:MCoordinator
.>>> from metrix import MElement, MStream >>> eles = [{"value": 1}, {"value": 2}, {"value": 1, "tags": {"foo": "bar"}}] >>> mstream = MStream("m", agg=sum, default_tags={"foo": "BAR!"}, window_size=1) >>> # HACK! we'll add a sink directly so we can see what happens >>> mstream.stream.sink(print) >>> for ele in eles: ... mstream.send(**ele) MElement(name=m.sum, value=3, tags={'foo': 'BAR!'}) MElement(name=m.sum, value=1, tags={'foo': 'bar'})
- Parameters
name – Name of the metric whose elements are sent into this stream.
agg – One or multiple aggregation functions to be applied to groups of metric elements’ values in order to produce new, aggregated metric elements. This may be specified as a single callable or a sequence of callables, in which case the corresponding components of the
MStream.stream
are named after the functions themselves; this may also be specified as a mapping of component name to callable, in which case the user-specified names are used instead.default_tags – Optional set of tags to apply to all metric elements by default. Tags specified on individual elements override and append to this def
window_size – Size of tumbling window in seconds with which to group elements. For example: If
window_size=10
, all elements sent into the stream within a given 10-second window will be grouped together before their values are aggregated, as specified byagg
.batch_size – Size of batches in number of elements with which to group elements. For example: If
batch_size=10
, every 10 successive elements sent into the stream will be grouped together before their values are aggregated, as specified byagg
. Note that settingbatch_size=1
will effectively skip grouping, in which case aggregating values doesn’t make sense, either.
Note
You must set either
window_size
orbatch_size
when initializing a metric stream. No default is set because it depends entirely on context: the rate with which metric elements are sent into the stream, the desired resolution on aggregated metrics, and any rate limit requirements on connected metric sinks. This is the only stream attribute that demands deliberate thought. Choose wisely! :)-
name
¶
-
agg
¶
-
window_size
¶
-
batch_size
¶
-
source
¶ Entry point to the metric stream.
-
stream
¶ Data processing stream to which metric elements are sent.
-
send
(value: Union[int, float], *, tags: Optional[Dict] = None) → None[source]¶ Send a given metric value to the stream; optionally, pass metric-specific tags to add new and overwrite existing default tags associated with the stream.
- Parameters
value – Numeric metric value.
tags – Optional tags to associate with this specific metric
value
.
-
timer
(scale: int = 1, *, tags: Optional[Dict] = None)[source]¶ Context manager that measures the elapsed time spent running statements enclosed by the
with
statement, and sends that time to the stream.- Parameters
scale – Multiplier applied to the elapsed time value, in seconds by default. For example, to report time in milliseconds, use
scale=1000
.tags – Optional tags to associate with this specific timer value.
See also
-
metrix.stream.
group_has_elements
(group: Sequence[metrix.element.MElement]) → bool[source]¶ Return True if
group
contains any metric elements, and False otherwise; used to filter out empty group from a metric stream.