metric coordinator¶
-
class
metrix.coordinator.
MCoordinator
(*, mstreams: Optional[Sequence[metrix.stream.MStream]] = None, msinks: Optional[Sequence[metrix.sinks.MSink]] = None, rate_limit: Optional[Union[int, float, Sequence[Union[int, float]]]] = None)[source]¶ Class that coordinates the flow of metric elements through one or multiple streams into one or multiple sinks, with an optional rate limit imposed before the end.
Here are a few simple examples to illustrate key features:
>>> import statistics, time >>> from metrix import MCoordinator, MStream, MSinkLogger, MSinkPrinter >>> # one stream, one agg, one sink >>> mc = MCoordinator( ... mstreams=[MStream("n", agg=sum, batch_size=2)], ... msinks=[MSinkPrinter()], ... ) >>> mc.send("n", 1) >>> mc.send("n", 2) MElement(name=n.sum, value=3, tags=None) >>> # one stream, two aggs, two sinks >>> mc = MCoordinator( ... mstreams=[MStream("n", agg=[max, statistics.mean], batch_size=3)], ... msinks=[MSinkPrinter(), MSinkLogger()], ... ) >>> mc.send("n", 1) >>> mc.send("n", 2) >>> mc.send("n", 3) MElement(name=n.max, value=3, tags=None) MElement(name=n.mean, value=2, tags=None) INFO:metrix.sinks:MElement(name=n.max, value=3, tags=None) INFO:metrix.sinks:MElement(name=n.mean, value=2, tags=None) >>> # two streams, default and element tags, and a timer >>> mc = MCoordinator( ... mstreams=[ ... MStream("n", agg=sum, batch_size=3, default_tags={"foo": "bar"}), ... MStream("time", agg={"avg": statistics.mean}, window_size=1), ... ], ... msinks=[MSinkPrinter()], ... ) >>> mc.send("n", 1) >>> mc.send("n", 1) >>> mc.send("n", 1, tags={"foo": "BAR!"}) MElement(name=n.sum, value=2, tags={'foo': 'bar'}) MElement(name=n.sum, value=1, tags={'foo': 'BAR!'}) >>> with mc.timer("time", scale=1): ... time.sleep(0.5) >>> with mc.timer("time", scale=1): ... time.sleep(0.75) >>> with mc.timer("time", scale=1): ... time.sleep(0.5) >>> with mc.timer("time", scale=1): ... time.sleep(0.25) MElement(name=time.avg, value=0.5028860028833151, tags=None) MElement(name=time.avg, value=0.7517436337657273, tags=None) MElement(name=time.avg, value=0.377787658944726, tags=None)
In typical production usage, you’ll be tracking a few metrics and periodically logging and/or sending aggregated values to TSDB. Here’s how that might look:
>>> from metrics import MSinkTSDB >>> mc = MCoordinator( ... mstreams=[ ... MStream("n_msgs", agg=sum, window_size=3), ... MStream("msg_len", agg=[statistics.mean, statistics.stdev], window_size=5) ... ], ... msinks=[MSinkLogger(), MSinkTSDB(<TSDB_CLIENT>)], ... rate_limit=[0, 1.0], ... ) >>> msgs = list(range(10)) # fake data ;) >>> for msg in msgs: ... mc.send("n_msgs", 1) ... mc.send("msg_len", msg) INFO:metrix.sinks:MElement(name=n_msgs.sum, value=10, tags=None) INFO:metrix.sinks:MElement(name=msg_len.mean, value=4.5, tags=None) INFO:metrix.sinks:MElement(name=msg_len.stdev, value=3.0276503540974917, tags=None)
- Parameters
mstreams – One or more
MStream
s through which metric elements are sent. Typically provided on init, but may also be passed individually viaMCoordinator.add_mstream()
.msinks – One or more
MSink
s to which metric elements are sent. Typically provided on init, but may also be passed individually viaMCoordinator.add_msink()
. In a development context, the simpleMSinkPrinter
will give visibility into the outputs of metric streams, but in a production, you’ll want to specify more persistent metric sinks likeMSinkLogger
andMSinkTSDB
.rate_limit –
Optional rate limit that prevents two metric elements from streaming into a sink in an interval shorter than
rate_limit
seconds. If a single number, this is applied to allmsinks
; if a sequence of numbers with the same length asmsinks
, limits will be applied element- wise to the corresponding metric sinks.For example:
rate_limit=1.5
causes elements to be sent on to each sink inmsinks
at least 1.5 seconds apart. Ifrate_limit=[1.5, 0.5]
(and two sinks are specified), then the first sink will have a 1.5-second rate limit while the second will have a 0.5-second rate limit applied.
Warning
If
MSinkTSDB
is added as a sink, be sure to haverate_limit
set to at least 1.0 seconds to prevent data loss, since OpenTSDB doesn’t support sub-second data. (Yes, I know – it’s bonkers.)Given this constraint, you must also be mindful of the total number of unique metric (name, agg, tags) pairs passing through
mstreams
per second to ensure that the output sinks can keep up with the rate of input metrics. For example, if 2 streams use a single aggregator with defaultwindow_size=10
andrate_limit=1.0
, then you should limit yourself to no more than 5 distinct tag sets per metric. If you have more tags or aggs, increase your window size accordingly! Here’s a useful formula:sum((num_aggs * num_unique_tag_sets / window_size) for stream in mstreams) = num_total_metrics_per_sec
If
num_total_metrics_per_sec > rate_limit
, you have a problem.-
stream
¶ Base metric coordinator stream, to which metric streams connect and from which metric sinks extend.
-
metric_mstreams
¶ Mapping of metric name to metric stream, each of which is upstream from and connected to
MCoordinator.stream
.
-
msinks
¶ Sequence of metric sinks, each of which is downstream from and connected to
MCoordinator.stream
.
-
add_mstream
(mstream: metrix.stream.MStream) → None[source]¶ Add a metric stream to this coordinator by connecting it to all sink streams and making it accessible by name via
MCoordinator.metric_mstreams
.
-
add_msink
(msink: metrix.sinks.MSink, rate_limit: Optional[Union[int, float]] = None) → None[source]¶ Add a metric sink to this coordinator by branching off
MCoordinator.stream
with a buffered, optionally rate-limited stream that ends inmsink
.
-
send
(name: str, value: Union[int, float], *, tags: Optional[Dict] = None) → None[source]¶ Send a metric value to a particular metric stream; optionally, pass tags to add new and overwrite existing default tags associated with the stream.
- Parameters
name – Metric name.
value – Numeric metric value.
tags – Optional tags to associate with this specific metric
value
.
See also
-
timer
(name: str, scale: int = 1, *, tags: Optional[Dict] = None)[source]¶ Get a context manager for a particular stream that measures the elapsed time spent running statements enclosed by the
with
statement, and sends that time to the stream.- Parameters
scale – Multiplier applied to the elapsed time value, in seconds by default. For example, to report time in milliseconds, use
scale=1000
.tags – Optional tags to associate with this specific timer value.
See also