metric coordinator

class metrix.coordinator.MCoordinator(*, mstreams: Optional[Sequence[metrix.stream.MStream]] = None, msinks: Optional[Sequence[metrix.sinks.MSink]] = None, rate_limit: Optional[Union[int, float, Sequence[Union[int, float]]]] = None)[source]

Class that coordinates the flow of metric elements through one or multiple streams into one or multiple sinks, with an optional rate limit imposed before the end.

Here are a few simple examples to illustrate key features:

>>> import statistics, time
>>> from metrix import MCoordinator, MStream, MSinkLogger, MSinkPrinter
>>> # one stream, one agg, one sink
>>> mc = MCoordinator(
...     mstreams=[MStream("n", agg=sum, batch_size=2)],
...     msinks=[MSinkPrinter()],
... )
>>> mc.send("n", 1)
>>> mc.send("n", 2)
MElement(name=n.sum, value=3, tags=None)
>>> # one stream, two aggs, two sinks
>>> mc = MCoordinator(
...     mstreams=[MStream("n", agg=[max, statistics.mean], batch_size=3)],
...     msinks=[MSinkPrinter(), MSinkLogger()],
... )
>>> mc.send("n", 1)
>>> mc.send("n", 2)
>>> mc.send("n", 3)
MElement(name=n.max, value=3, tags=None)
MElement(name=n.mean, value=2, tags=None)
INFO:metrix.sinks:MElement(name=n.max, value=3, tags=None)
INFO:metrix.sinks:MElement(name=n.mean, value=2, tags=None)
>>> # two streams, default and element tags, and a timer
>>> mc = MCoordinator(
...     mstreams=[
...         MStream("n", agg=sum, batch_size=3, default_tags={"foo": "bar"}),
...         MStream("time", agg={"avg": statistics.mean}, window_size=1),
...     ],
...     msinks=[MSinkPrinter()],
... )
>>> mc.send("n", 1)
>>> mc.send("n", 1)
>>> mc.send("n", 1, tags={"foo": "BAR!"})
MElement(name=n.sum, value=2, tags={'foo': 'bar'})
MElement(name=n.sum, value=1, tags={'foo': 'BAR!'})
>>> with mc.timer("time", scale=1):
...     time.sleep(0.5)
>>> with mc.timer("time", scale=1):
...     time.sleep(0.75)
>>> with mc.timer("time", scale=1):
...     time.sleep(0.5)
>>> with mc.timer("time", scale=1):
...     time.sleep(0.25)
MElement(name=time.avg, value=0.5028860028833151, tags=None)
MElement(name=time.avg, value=0.7517436337657273, tags=None)
MElement(name=time.avg, value=0.377787658944726, tags=None)

In typical production usage, you’ll be tracking a few metrics and periodically logging and/or sending aggregated values to TSDB. Here’s how that might look:

>>> from metrics import MSinkTSDB
>>> mc = MCoordinator(
...     mstreams=[
...         MStream("n_msgs", agg=sum, window_size=3),
...         MStream("msg_len", agg=[statistics.mean, statistics.stdev], window_size=5)
...     ],
...     msinks=[MSinkLogger(), MSinkTSDB(<TSDB_CLIENT>)],
...     rate_limit=[0, 1.0],
... )
>>> msgs = list(range(10))  # fake data ;)
>>> for msg in msgs:
...     mc.send("n_msgs", 1)
...     mc.send("msg_len", msg)
INFO:metrix.sinks:MElement(name=n_msgs.sum, value=10, tags=None)
INFO:metrix.sinks:MElement(name=msg_len.mean, value=4.5, tags=None)
INFO:metrix.sinks:MElement(name=msg_len.stdev, value=3.0276503540974917, tags=None)
Parameters
  • mstreams – One or more MStream s through which metric elements are sent. Typically provided on init, but may also be passed individually via MCoordinator.add_mstream().

  • msinks – One or more MSink s to which metric elements are sent. Typically provided on init, but may also be passed individually via MCoordinator.add_msink(). In a development context, the simple MSinkPrinter will give visibility into the outputs of metric streams, but in a production, you’ll want to specify more persistent metric sinks like MSinkLogger and MSinkTSDB.

  • rate_limit

    Optional rate limit that prevents two metric elements from streaming into a sink in an interval shorter than rate_limit seconds. If a single number, this is applied to all msinks; if a sequence of numbers with the same length as msinks, limits will be applied element- wise to the corresponding metric sinks.

    For example: rate_limit=1.5 causes elements to be sent on to each sink in msinks at least 1.5 seconds apart. If rate_limit=[1.5, 0.5] (and two sinks are specified), then the first sink will have a 1.5-second rate limit while the second will have a 0.5-second rate limit applied.

Warning

If MSinkTSDB is added as a sink, be sure to have rate_limit set to at least 1.0 seconds to prevent data loss, since OpenTSDB doesn’t support sub-second data. (Yes, I know – it’s bonkers.)

Given this constraint, you must also be mindful of the total number of unique metric (name, agg, tags) pairs passing through mstreams per second to ensure that the output sinks can keep up with the rate of input metrics. For example, if 2 streams use a single aggregator with default window_size=10 and rate_limit=1.0, then you should limit yourself to no more than 5 distinct tag sets per metric. If you have more tags or aggs, increase your window size accordingly! Here’s a useful formula:

sum((num_aggs * num_unique_tag_sets / window_size) for stream in mstreams) = num_total_metrics_per_sec

If num_total_metrics_per_sec > rate_limit, you have a problem.

stream

Base metric coordinator stream, to which metric streams connect and from which metric sinks extend.

metric_mstreams

Mapping of metric name to metric stream, each of which is upstream from and connected to MCoordinator.stream.

msinks

Sequence of metric sinks, each of which is downstream from and connected to MCoordinator.stream.

add_mstream(mstream: metrix.stream.MStream)None[source]

Add a metric stream to this coordinator by connecting it to all sink streams and making it accessible by name via MCoordinator.metric_mstreams.

add_msink(msink: metrix.sinks.MSink, rate_limit: Optional[Union[int, float]] = None)None[source]

Add a metric sink to this coordinator by branching off MCoordinator.stream with a buffered, optionally rate-limited stream that ends in msink.

send(name: str, value: Union[int, float], *, tags: Optional[Dict] = None)None[source]

Send a metric value to a particular metric stream; optionally, pass tags to add new and overwrite existing default tags associated with the stream.

Parameters
  • name – Metric name.

  • value – Numeric metric value.

  • tags – Optional tags to associate with this specific metric value.

See also

MStream.send()

timer(name: str, scale: int = 1, *, tags: Optional[Dict] = None)[source]

Get a context manager for a particular stream that measures the elapsed time spent running statements enclosed by the with statement, and sends that time to the stream.

Parameters
  • scale – Multiplier applied to the elapsed time value, in seconds by default. For example, to report time in milliseconds, use scale=1000.

  • tags – Optional tags to associate with this specific timer value.

See also

MStream.timer()