Source code for metrix.sinks
import logging
from metrix import MElement
__all__ = ["MSink", "MSinkPrinter", "MSinkLogger", "MSinkTSDB"]
[docs]class MSink:
"""
Base class for subclasses that are called on a :class:`MElement <metrix.element.MElement>`
and perform some useful action on it.
"""
def __call__(self, me: MElement) -> None:
raise NotImplementedError(
"every MSink must define its own __call__ method that accepts "
"a MElement and performs some useful action on it"
)
[docs]class MSinkPrinter(MSink):
"""
Class that's called on a :class:`MElement <metrix.element.MElement>` and prints it
to stdout. That's it! This class is useful in development when experimenting with
:class:`MCoordinator <metrix.coordinator.MCoordinator>` so users can see stream
contents, but is not suitable for production.
.. code-block:: pycon
>>> from metrix import MElement, MSinkPrinter
>>> msink = MSinkPrinter()
>>> msink(MElement("foo", 1))
MElement(name=foo, value=1, tags=None)
"""
def __call__(self, me: MElement) -> None:
print(me)
def __str__(self):
return "MSinkPrinter"
[docs]class MSinkLogger(MSink):
"""
Class that's called on a :class:`MElement <metrix.element.MElement>` and logs it,
as-is.
.. code-block:: pycon
>>> from metrix import MElement, MSinkLogger
>>> me = MElement("foo", 1)
>>> msink = MSinkLogger()
>>> msink(me)
INFO:metrix.sinks:MElement(name=foo, value=1, tags=None)
>>> msink = MSinkLogger(name="my-logger", level=30, msg_fmt_str="[metric] %s")
WARNING:my-logger:[metric] MElement(name=foo, value=1, tags=None)
Args:
name: Name of the logger to use when logging metric elements.
level: Level at which metric elements are logged.
msg_fmt_str: Message format string into which metric elements are merged using
a string formatting operator. Must contain exactly one "%s" for a given
:class:`MElement <metrix.element.MElement>`; may contain any other
hard-coded text you wish.
Attributes:
logger: :class:`logging.Logger`
level: int
msg_fmt_str: str
"""
__slots__ = ("logger", "level", "msg_fmt_str")
def __init__(
self,
name: str = "metrix.sinks",
level: int = logging.INFO,
msg_fmt_str: str = "%s",
):
self.logger = logging.getLogger(name)
self.level = level
if msg_fmt_str.count("%s") == 1:
self.msg_fmt_str = msg_fmt_str
else:
raise ValueError(
f"msg_fmt_str='{msg_fmt_str}' is invalid; must contain exactly one "
"string formatting placeholder for a metric element"
)
def __call__(self, me: MElement) -> None:
self.logger.log(self.level, self.msg_fmt_str, me)
def __str__(self):
return f"MSinkLogger(logger={self.logger}, level={self.level})"
[docs]class MSinkTSDB(MSink):
"""
Class that's called on a :class:`MElement <metrix.element.MElement>` and sends
its data to OpenTSDB via an instantiated TSDB client.
Args:
tsdb_client: Instantiated TSDB client with a ``send`` method, such as
``potsdb.Client``.
Attributes:
tsdb_client
Note:
It's the user's responsibility to ensure that a suitable TSDB client library
is available in the environment where this class is instantiated.
"""
def __init__(self, tsdb_client):
self.tsdb_client = tsdb_client
def __call__(self, me: MElement) -> None:
tags = me.tags or {}
self.tsdb_client.send(me.name, me.value, **tags)
def __str__(self):
return f"MSinkTSDB(tsdb_client={self.tsdb_client})"