Telegraf#

Telegraf is an agent for collecting, processing, aggregating, and writing metrics. It supports a wide range of input plugins and output plugins. Telegraf is used in the LEAF backend to collect metrics from various sources and write them to the TimescaleDB database.

Quick Start#

In principle the Telegraf system is ready to use after the backend has been started. The Telegraf configuration file is located in the telegraf directory. The configuration file is named telegraf.conf and is used to define the input and output plugins.

Currently, the telegraf configuration file is set up to collect metrics from the MQTT broker on the topic leaf/public and write them to the TimescaleDB database. The configuration file is set up to use the mqtt_consumer input plugin and the timescaledb output plugin.

Network Configuration#

Telegraf operates exclusively on the backend network for security:

  • Backend Network: Communication with TimescaleDB and VerneMQ

  • Database Access: Uses dedicated telegraf_user credentials

  • MQTT Access: Connects to VerneMQ broker via internal network

  • Resource Limits: 256MB memory, 0.25 CPU cores

Configuration#

If you ever need to change the credentials or the topic, you can modify the telegraf.conf file.

For MQTT the following sections can be important:

  servers = ["${MQTT_HOST}:1883"]

  ## Topics that will be subscribed to.
  topics = [
    "leaf/publish",
  ]

  ## Username and password to connect MQTT server.
  username = "${MQTT_USER}"
  password = "${MQTT_PASSWORD}"

For Timescaledb:

  connection = "postgres://telegraf_user:${TELEGRAF_DB_PASSWORD}@${PGHOST}:${PGPORT}/${PGDATABASE}?sslmode=disable"

As you can see the configuration file uses environment variables to set the connection details. This is done to keep the configuration file clean and to separate the configuration from the code. You can find the environment variables in the .env file in the root of the cloud directory.

The current configuration file is set up to collect metrics from the MQTT broker and write them to the TimescaleDB database using the telegraf_user credentials.

Below is the complete telegraf.conf file used in the LEAF backend:

# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply surround
# them with ${}. For strings the variable must be within quotes (ie, "${STR_VAR}"),
# for numbers and booleans they should be plain (ie, ${INT_VAR}, ${BOOL_VAR})


# Global tags can be specified here in key="value" format.
# [global_tags]
  # dc = "us-east-1" # will tag all metrics with dc=us-east-1
  # rack = "1a"
  ## Environment variables can be used as tags, and throughout the config file
  # user = "$USER"

# Configuration for telegraf agent
[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = true

  ## Telegraf will send metrics to outputs in batches of at most
  ## metric_batch_size metrics.
  ## This controls the size of writes that Telegraf sends to output plugins.
  metric_batch_size = 1000

  ## Maximum number of unwritten metrics per output.  Increasing this value
  ## allows for longer periods of output downtime without dropping metrics at the
  ## cost of higher maximum memory usage.
  metric_buffer_limit = 10000

  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"

  ## Collection offset is used to shift the collection by the given amount.
  ## This can be be used to avoid many plugins querying constraint devices
  ## at the same time by manually scheduling them in time.
  # collection_offset = "0s"

  ## Default flushing interval for all outputs. Maximum flush_interval will be
  ## flush_interval + flush_jitter
  flush_interval = "10s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"

  ## Collected metrics are rounded to the precision specified. Precision is
  ## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
  ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
  ##
  ## By default or when set to "0s", precision will be set to the same
  ## timestamp order as the collection interval, with the maximum being 1s:
  ##   ie, when interval = "10s", precision will be "1s"
  ##       when interval = "250ms", precision will be "1ms"
  ##
  ## Precision will NOT be used for service inputs. It is up to each individual
  ## service input to set the timestamp at the appropriate precision.
  precision = "0s"

  ## Log at debug level.
  debug = true
  ## Log only error level messages.
  # quiet = false

  ## Log format controls the way messages are logged and can be one of "text",
  ## "structured" or, on Windows, "eventlog".
  # logformat = "text"

  ## Name of the file to be logged to or stderr if unset or empty. This
  ## setting is ignored for the "eventlog" format.
  # logfile = ""

  ## The logfile will be rotated after the time interval specified.  When set
  ## to 0 no time based rotation is performed.  Logs are rotated only when
  ## written to, if there is no log activity rotation may be delayed.
  # logfile_rotation_interval = "0h"

  ## The logfile will be rotated when it becomes larger than the specified
  ## size.  When set to 0 no size based rotation is performed.
  # logfile_rotation_max_size = "0MB"

  ## Maximum number of rotated archives to keep, any older logs are deleted.
  ## If set to -1, no archives are removed.
  # logfile_rotation_max_archives = 5

  ## Pick a timezone to use when logging or type 'local' for local time.
  ## Example: America/Chicago
  # log_with_timezone = ""

  ## Override default hostname, if empty use os.Hostname()
  # hostname = ""
  ## If set to true, do no set the "host" tag in the telegraf agent.
  omit_hostname = true

  ## Method of translating SNMP objects. Can be "netsnmp" (deprecated) which
  ## translates by calling external programs snmptranslate and snmptable,
  ## or "gosmi" which translates using the built-in gosmi library.
  # snmp_translator = "netsnmp"

  ## Name of the file to load the state of plugins from and store the state to.
  ## If uncommented and not empty, this file will be used to save the state of
  ## stateful plugins on termination of Telegraf. If the file exists on start,
  ## the state in the file will be restored for the plugins.
  # statefile = ""

  ## Flag to skip running processors after aggregators
  ## By default, processors are run a second time after aggregators. Changing
  ## this setting to true will skip the second run of processors.
  # skip_processors_after_aggregators = false

# Publishes metrics to a postgresql database
[[outputs.postgresql]]
  ## Specify connection address via the standard libpq connection string:
  ##   host=... user=... password=... sslmode=... dbname=...
  ## Or a URL:
  ##   postgres://[user[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
  ## See https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
  ##
  ## All connection parameters are optional. Environment vars are also supported.
  ## e.g. PGPASSWORD, PGHOST, PGUSER, PGDATABASE
  ## All supported vars can be found here:
  ##  https://www.postgresql.org/docs/current/libpq-envars.html
  ##
  ## Non-standard parameters:
  ##   pool_max_conns (default: 1) - Maximum size of connection pool for parallel (per-batch per-table) inserts.
  ##   pool_min_conns (default: 0) - Minimum size of connection pool.
  ##   pool_max_conn_lifetime (default: 0s) - Maximum age of a connection before closing.
  ##   pool_max_conn_idle_time (default: 0s) - Maximum idle time of a connection before closing.
  ##   pool_health_check_period (default: 0s) - Duration between health checks on idle connections.
  connection = "postgres://${TELEGRAF_DB_USER}:${TELEGRAF_DB_PASSWORD}@${PGHOST}:${PGPORT}/${PGDATABASE}?sslmode=disable"

  ## Postgres schema to use.
  schema = "public"

  ## Store tags as foreign keys in the metrics table. Default is false.
  tags_as_foreign_keys = false

  ## Suffix to append to table name (measurement name) for the foreign tag table.
  tag_table_suffix = ""

  ## Deny inserting metrics if the foreign tag can't be inserted.
  foreign_tag_constraint = false

  ## Store all tags as a JSONB object in a single 'tags' column.
  tags_as_jsonb = false

  ## Store all fields as a JSONB object in a single 'fields' column.
  fields_as_jsonb = false

  ## Name of the timestamp column
  ## NOTE: Some tools (e.g. Grafana) require the default name so be careful!
  timestamp_column_name = "time"

  ## Type of the timestamp column
  ## Currently, "timestamp without time zone" and "timestamp with time zone"
  ## are supported
  timestamp_column_type = "timestamp without time zone"

  ## Templated statements to execute when creating a new table.
  create_templates = [
      '''CREATE TABLE {{ .table }} ({{ .columns }})''',
      '''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '7d')''',
      # '''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
  ]
  ## Templated statements to execute when adding columns to a table.
  ## Set to an empty list to disable. Points containing tags for which there is no column will be skipped. 
  ## Points containing fields for which there is no column will have the field omitted.
  add_column_templates = [
    '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }};''',
  ]

  ## Templated statements to execute when creating a new tag table.
  tag_table_create_templates = [
    '''CREATE TABLE IF NOT EXISTS {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id));''',
  ]

  ## Templated statements to execute when adding columns to a tag table.
  ## Set to an empty list to disable. Points containing tags for which there is no column will be skipped.
  tag_table_add_column_templates = [
    '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }};''',
  ]

  ## The postgres data type to use for storing unsigned 64-bit integer values (Postgres does not have a native
  ## unsigned 64-bit integer type).
  ## The value can be one of:
  ##   numeric - Uses the PostgreSQL "numeric" data type.
  ##   uint8 - Requires pguint extension (https://github.com/petere/pguint)
  # uint64_type = "numeric"

  ## When using pool_max_conns>1, and a temporary error occurs, the query is retried with an incremental backoff. This
  ## controls the maximum backoff duration.
  # retry_max_backoff = "15s"

  ## Approximate number of tag IDs to store in in-memory cache (when using tags_as_foreign_keys).
  ## This is an optimization to skip inserting known tag IDs.
  ## Each entry consumes approximately 34 bytes of memory.
  # tag_cache_size = 100000

  ## Enable & set the log level for the Postgres driver.
  log_level = "debug" # trace, debug, info, warn, error, none


# # Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
  ## Broker URLs for the MQTT server or cluster.  To connect to multiple
  ## clusters or standalone servers, use a separate plugin instance.
  ##   example: servers = ["tcp://localhost:1883"]
  ##            servers = ["ssl://localhost:1883"]
  ##            servers = ["ws://localhost:1883"]
  servers = ["tcp://${MQTT_HOST}:${MQTT_PORT}"]

  ## Topics that will be subscribed to.
  topics = [
    # "telegraf/host01/cpu",
    # "telegraf/+/mem",
    # "leaf/publish",
    "#"  # Subscribe to all topics
  ]

  ## The message topic will be stored in a tag specified by this value.  If set
  ## to the empty string no topic tag will be created.
  topic_tag = "topic"

  ## QoS policy for messages
  ##   0 = at most once
  ##   1 = at least once
  ##   2 = exactly once
  ##
  ## When using a QoS of 1 or 2, you should enable persistent_session to allow
  ## resuming unacknowledged messages.
  qos = 0

  ## Connection timeout for initial connection in seconds
  connection_timeout = "30s"

  ## Interval and ping timeout for keep-alive messages
  ## The sum of those options defines when a connection loss is detected.
  ## Note: The keep-alive interval needs to be greater or equal one second and
  ## fractions of a second are not supported.
  keepalive = "60s"
  ping_timeout = "10s"

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Persistent session disables clearing of the client session on connection.
  ## In order for this option to work you must also set client_id to identify
  ## the client.  To receive messages that arrived while the client is offline,
  ## also set the qos option to 1 or 2 and don't forget to also set the QoS when
  ## publishing. Finally, using a persistent session will use the initial
  ## connection topics and not subscribe to any new topics even after
  ## reconnecting or restarting without a change in client ID.
  # persistent_session = false

  ## If unset, a random client ID will be generated.
  # client_id = ""

  ## Username and password to connect MQTT server.
  username = "${MQTT_USER}"
  password = "${MQTT_PASSWORD}"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  insecure_skip_verify = false

  ## Client trace messages
  ## When set to true, and debug mode enabled in the agent settings, the MQTT
  ## client's messages are included in telegraf logs. These messages are very
  ## noisey, but essential for debugging issues.
  client_trace = true

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json"
  json_strict = true
  json_time_key = "timestamp"
  json_time_format = "unix"
  json_name_key = "measurement"
  tag_keys = [
    "tags*",
  ]
  # This is to set a prefix for a table
  name_prefix = "leaf_"
  ## Enable extracting tag values from MQTT topics
  ## _ denotes an ignored entry in the topic path,
  ## # denotes a variable length path element (can only be used once per setting)
  # [[inputs.mqtt_consumer.topic_parsing]]
  #   topic = ""
  #   measurement = ""
  #   tags = ""
  #   fields = ""
  ## Value supported is int, float, unit
  #   [inputs.mqtt_consumer.topic_parsing.types]
  #      key = type

  [[inputs.cpu]]
    ## Whether to report per-cpu stats or not
    percpu = false
    ## Whether to report total system cpu stats or not
    totalcpu = true
    ## If true, collect raw CPU time metrics
    collect_cpu_time = false
    ## If true, compute and report the sum of all non-idle CPU states
    ## NOTE: The resulting 'time_active' field INCLUDES 'iowait'!
    report_active = false
    ## If true and the info is available then add core_id and physical_id tags
    core_tags = false

  # Read metrics about memory usage
  [[inputs.mem]]
    # no configuration

  # Read metrics about disk usage by mount point
  [[inputs.disk]]
    ## By default stats will be gathered for all mount points.
    ## Set mount_points will restrict the stats to only the specified mount points.
    # mount_points = ["/"]

    ## Ignore mount points by filesystem type.
    ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]

    ## Ignore mount points by mount options.
    ## The 'mount' command reports options of all mounts in parathesis.
    ## Bind mounts can be ignored with the special 'bind' option.
    # ignore_mount_opts = []