Table of Content

We are using for example Graphite, Prometheus and Elasticsearch for monitoring the systems and saving metrics in the company where I'm currently working at and unfortunately for example the data in the different sources is saved with different formatted hostnames etc., also there is currently no real nice way to tag or group different hosts consistently through all those datasources. So we needed something to be able to declare groups with hosts and other parameters to nicely differ/choose between the different hosts/groups in a dashboard.

Also it's not really possible to do this nicely via Grafana supplied tools in a quick and easy way. While searching for a solution I stumbled upon this github issue from 2014 https://github.com/grafana/grafana/issues/1032 which is still very active ;). There someone mentioned he is using a SimpleJSON source for doing that. So here is what I did and maybe this is useful for someone else out there.

The old script was just a quick and dirty script plugged together from different sources 🙂 - I later got the time to completely rewrite it and also added some other features.

For example:

  • lowercase the data supplied by this source - can sometimes be handy if in one datasource the hostnames are uppercase but in another they are uppercase
  • use Elasticsearch term-aggregations - e.g. in our case we added the virtual-hostname to be logged in httpd-logs

BTW - This script is only meant for internal usage and the datasource itself probably shouldn't be publicly be accessible. Not sure how safe this app is as I'm unfortunately not a Flask-pro 😉

Prerequisites

We are running Grafana on CentOS

For supplying the JSON I decided to use some simple flask-app running in it's own python virtualenv

Setup

# install prerequisites for the python modules installed via pip
yum install make gcc python3-devel

# install virtualenv
pip3 install virtualenv

# add a simple user
adduser flaskapp

# work as user
su - flaskapp

# create the folder where the grafana-json-source is running and change into it
mkdir app-grafana-json-source
cd app-grafana-json-source

# create the python environment
virtualenv -p python3 env
source env/bin/activate

# check if really the correct pip3 is used
which pip3

# install some needed modules
pip3 install flask uwsgi requests addict ruamel.yaml

The Flask App

create as user flaskapp the following files

~/app-grafana-json-source/grafana-json-source.py

# A Flask app for supplying an option to group/tag list of hosts
# For example if the same host has different formats in different Grafana datasources
# Or if the hosts are missing tags for being able to be grouped in Grafana
#
# 2021-12-20 - clemens[at]steinkogler.org
#
# Changelog: 
# 2021-12-20 - complete rewrite of initial version

import datetime
import requests
import os
from requests.auth import HTTPBasicAuth
from flask import Flask
from flask import jsonify
from flask import request
from flask import json
from addict import Dict
from ruamel.yaml import YAML
# for disabling HTTPS unsecure warning if SSL is not verfied
from urllib3.exceptions import InsecureRequestWarning
# Suppress only the single warning from urllib3 needed.
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

app = Flask(__name__)
methods = ('GET', 'POST')

# for debug mode - if uwsi is used
#from werkzeug.debug import DebuggedApplication
#app.wsgi_app = DebuggedApplication(app.wsgi_app, True)
#app.debug = True


def merge_dicts(dict1, dict2):
    res = {**dict1, **dict2}
    return res
# enddef


def read_yamls(folder):
    yaml_content = YAML()
    dict_output = {}
    for file in os.listdir(folder):
        if file.endswith(".yaml") or file.endswith(".yml"):
            with open(os.path.join(folder, file), 'r') as f:
                file_content = f.read()
                try:
                    current_dict = yaml_content.load(file_content)
                    dict_output = merge_dicts(dict_output, current_dict)
                except Exception as error:
                    # if file is not in correct format, just print/log an error
                    app.logger.error("Error reading file - please, check syntax of %s/%s \n" % (folder, file))
                    app.logger.debug("File-content: \n" + file_content)
                    app.logger.error("Error-message: %s\n" % str(error))
                # endtry
            # endwith
        # endif
    # endfor

    return Dict(dict_output)
# enddef


def get_unique_values_from_es(index_patterns, hosts, field, unique_values_number, es_source, es_user, es_pw, es_proxy, es_ssl_verify, es_timeout):
    """
    :param index_patterns:
    :param hosts:
    :param field:
    :param unique_values_number:
    :param es_source:
    :param es_user:
    :param es_pw:
    :param es_proxy:
    :param es_ssl_verify:
    :return:

    example aggregation query:
    GET _search
    {
      "size": 0,
      "_source": "filebeat*",
      "query": {
        "bool": {
          "filter": [
            {
              "match_phrase": {
                "event.module": "nginx"
              }
            },
            {
              "bool": {
                "should": [
                  {
                    "match_phrase": {
                      "host.name": "some.full-qualified-name.com"
                    }
                  },
                  {
                    "match_phrase": {
                      "host.name": "www.steinkogler.org"
                    }
                  }
                ],
                "minimum_should_match": 1
              }
            }
          ]
        }
      },
      "aggs": {
        "unique_values": {
          "terms": {
            "field": "url.original",
            "size": 500
          }
        }
      }
    }
    """

    # if we have multiple match_phrase we have to prepare the "should"-list
    match_phrase_list = []
    for host in hosts:
        match_phrase_list.append(
            {
                "match_phrase": {
                    "host.name": host
                }
            }
        )
    # endfor

    # some es aggregation query "template" we fill with the needed stuff
    es_query = {
        "size": 0,
        "_source": index_patterns,
        "query": {
            "bool": {
                "filter": [
                    {
                        "bool": {
                            "should": match_phrase_list,
                            "minimum_should_match": 1
                        }
                    }
                ]
            }
        },
        "aggs": {
            "unique_values": {
                "terms": {
                    "field": field,
                    "size": unique_values_number
                }
            }
        }
    }
    app.logger.debug("es_query: \n %s \n" % es_query)

    # example curl query that is proxied through Kibana
    # curl --user elastic:securepw -H 'Content-Type: application/json' -H "kbn-xsrf: true" \
    # -XPOST 'https://mykibana.home.local/api/console/proxy?path=_search&method=GET' \
    # -d '{"_source": "filebeat*", "aggs": {"unique_values": {"terms": {"field": "url.original", "size": 10}}}, \
    #     "query": {"bool": {"filter": [{"bool": {"should": [{"match_phrase": {"host.name": "www.steinkogler.org"}}]}}]}}, "size": 0}'
    if es_proxy:
        headers = {
            "Content-Type": "application/json",
            "kbn-xsrf": "true"
        }
        req = requests.post(es_source, data=json.dumps(es_query), headers=headers, auth=HTTPBasicAuth(es_user, es_pw), timeout=es_timeout, verify=es_ssl_verify)
    else:
        headers = {
            "Content-Type": "application/json",
        }
        req = requests.get(es_source, data=json.dumps(es_query), headers=headers, auth=HTTPBasicAuth(es_user, es_pw), timeout=es_timeout, verify=es_ssl_verify)
    # endif

    json_output = req.json()
    app.logger.debug("elasticsearch output: \n%s \n" % str(json_output))

    # we initialize an empty list
    unique_values = []

    # we now put the unique term-keys we found into the list and finally return it as result
    for unique_value in json_output['aggregations']['unique_values']['buckets']:
        unique_values.append(unique_value['key'])
    # endfor

    return unique_values
# enddef


@app.route('/')
def root():
    return 'Simple JSON Source for our Grafana'
# enddef


# we can test via curl for example:
# curl -XPOST localhost:5000/search -H 'Content-Type: application/json' -d '{"target":"{\"get_data\": \"groups\"}"}'
@app.route('/search', methods=methods)
def find_metrics():
    # print(request.headers)
    # print(request.get_data())
    req = request.get_json()
    app.logger.debug("request json payload: %s" % str(req)) 

    # the query submitted by grafana is a "string" - so we have to load it as json
    target = json.loads(req.get('target', '*').replace('\\', ''))
    # print(target)

    today = datetime.datetime.now().strftime("%Y.%m.%d")
    yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y.%m.%d")

    # if no direct connection to Elasticsearch is available, you can proxy through Kibana
    es_env = read_yamls("yaml-configs/elasticsearch")
    app.logger.debug("es_env: %s \n" % str(es_env))

    # here we save the "grouping" - with the regex-textbox in Grafana you may have to manipulate the values even more
    # but this can be a great help if you want to use different datasources, where for example the hostnames are
    # slightly differently saved in the databases used.
    # first we load the "group"-files into the "groups"-key
    group_data = Dict({
        "groups": read_yamls("yaml-configs/groups")
    })
    # we have some special "date"-key, which will create today's and yesterday's date in some nice format
    date_data = Dict({
        "date": {  # maybe can be useful for elasticsearch queries, etc.
            "today": str(today),
            "yesterday": str(yesterday)
        }
    })
    # then we merge those dicts to one final big dictionary
    data = merge_dicts(group_data, date_data)
    app.logger.debug("data: %s \n" % str(data))

    # just initialize some empty output
    data_output = []

    # a very broad catch - too lazy to think about all cases.
    # just so that Grafana won't get wonky
    try:
        # first we check if we even got some correct request
        if "get_data" in target:
            lowercase_output = False
            elasticsearch_output = False
            get_data_extra_list = None

            get_data_value = target['get_data']

            # if the output should be a list of lowercased items
            # consider following - one datasource needs the hosts in lowercase
            #                      another in uppercase as defined
            if "lowercase" in get_data_value:
                lowercase_output = True
                get_data_value = get_data_value.replace('.lowercase', '')
            # endif

            # if query is a special elasticsearch aggregation query to get variables used in another elasticsearch
            # query in a panel
            if "elasticsearch" in get_data_value:
                elasticsearch_output = True
                get_data_value = get_data_value.replace('.elasticsearch', '')
            # endif

            # some example
            # in Grafana first define a single-value variable "group" - query: {"get_data": "groups"}
            # then we define a multi-value, include-all variable "hosts" - query: {"get_data": "groups.$group.hosts.lowercase"}
            # then we need another hidden multi-value, include-all variable "casesensitive-hosts" - query: {"get_data": "groups.$group.hosts = $hosts"}
            # $hosts is submitted as "(server-a|server-b)" by grafana - first have to create a normal list out of this string and then search
            # through the hosts list later to make sure everything is ok. The values of this hidden variable then can be used for in panels using
            # a different datasource, where the original notation is needed
            if " = " in get_data_value:
                get_data_value, get_data_extra = get_data_value.split(" = ")
                get_data_extra = get_data_extra.replace('(', '')
                get_data_extra = get_data_extra.replace(')', '')
                get_data_extra_list = get_data_extra.split('|')
            # endif

            # as we may have keys with dashes, we cannot take the dotted notation from the query directly,
            # so we split by dot and merge it to a string e.g. ['key-1']['key-2'] and use eval to create
            # the variable to finally get the value
            get_data_value_as_list = get_data_value.split(".")
            get_data_value_string = ""
            for item in get_data_value_as_list:
                get_data_value_string = get_data_value_string + "['" + item + "']"
            # endfor

            # we now can query the data variable
            query_data = eval("data" + get_data_value_string)

            # we need only a list as output so Grafana can use the values
            # if there is nested stuff in the queried part, we only need the keys and put them into a list
            # else we can output single values we find but, also as a list - just with a single value
            if isinstance(query_data, dict):
                data_output = list(query_data.keys())
            elif isinstance(query_data, list):
                if get_data_extra_list is not None:
                    for item in get_data_extra_list:
                        # if we find the item as given, we just can append it
                        if item in query_data:
                            data_output.append(item)

                        # if we receive a lowercased item, we have to search through all original lowercased items,
                        # but we want to append the original unmodified one
                        for query_data_item in query_data:
                            if item in query_data_item.lower():
                                data_output.append(query_data_item)
                            # endif
                        # endfor
                    # endfor
                else:
                    data_output = query_data
                # endif
            else:
                data_output = [query_data]
            # endif

            # This is for example if you want to query Elasticsearch to get a list of certain unique values
            # In Grafana you can create a query like this: {"get_data": "groups.$group.elasticsearch = $hosts"}
            # For example you can use this to get all unique url.original values from HTTP-access logs. This makes
            # it possible to only select certain urls you want to see stats for in a Grafana panel.
            if elasticsearch_output:
                index_patterns = eval("data" + get_data_value_string + "['elasticsearch']['index_patterns']")
                hosts = get_data_extra_list
                field = eval("data" + get_data_value_string + "['elasticsearch']['field']")
                unique_values_number = eval("data" + get_data_value_string + "['elasticsearch']['unique_values_number']")
                es_environment = eval("data" + get_data_value_string + "['elasticsearch']['es_environment']")
                # we now use dotted-notation as it's a little less complicated
                es_url = eval("es_env." + es_environment + ".es_url")
                es_user = eval("es_env." + es_environment + ".es_user")
                es_pw = eval("es_env." + es_environment + ".es_pw")
                es_proxy = eval("es_env." + es_environment + ".es_proxy")
                es_ssl_verify = eval("es_env." + es_environment + ".es_ssl_verify")
                es_timeout = eval("es_env." + es_environment + ".es_timeout")

                # let's get the unique values into a list, which we later can use
                data_output = get_unique_values_from_es(index_patterns, hosts, field, unique_values_number, es_url, es_user, es_pw, es_proxy, es_ssl_verify, es_timeout)
            # endif

            # if we need the output in lowercase
            if lowercase_output:
                for i in range(len(data_output)):
                    data_output[i] = data_output[i].lower()
                # endfor
            # endif

            return jsonify(data_output)
        else:
            app.logger.error("Only 'get_data' supported")
            # error 501 would be better - but that makes grafana unresponsive and you have to refresh the page
            return jsonify(["nothing to get"]), 200
        # endif
    except Exception as error:
        # just output an empty list, if the query was wrong or not handled well
        # if this happens we anyway have to debug what's going wrong
        app.logger.error("There was an error - is your query correct?")
        app.logger.error("Error-message: %s" % str(error))
        # error 500 would be better - but that makes grafana unresponsive and you have to refresh the page
        return jsonify([str(error)]), 200
    # endif
# enddef

~/app-grafana-json-source/wsgi.py

#!/usr/bin/env python
from grafana_json_source import app as application

if __name__ == "__main__":
    application.run()

~/app-grafana-json-source/app-grafana-json-source.ini

[uwsgi]
module = wsgi

master = true
processes = 2

# use socket if reverse proxy in between
#socket = 127.0.0.1:5000
# else for simple serving
http = 127.0.0.1:5000
chmod-socket = 660
vacuum = true

die-on-idle = false
die-on-term = true

Systemd-Unit File

must be created as root of course 😉

/etc/systemd/system/grafana-json-source.service

[Unit]
Description=uWSGI server for grafana-json-source
After=network.target

[Service]
User=flaskapp
Group=flaskapp
Restart=always
WorkingDirectory=/home/flaskapp/app-grafana-json-source
Environment="PATH=/home/flaskapp/app-grafana-json-source/env/bin"
ExecStart=/home/flaskapp/app-grafana-json-source/env/bin/uwsgi --ini app-grafana-json-source.ini

[Install]
WantedBy=multi-user.target

Reload systemd and start the json-source and after that add it as datasource in Grafana

# reload systemctl so it can use the unit-file
systemctl daemon-reload
# start your small grafana-json-source for "groups"
systemctl start grafana-json-source

Supplying Data with this source

As you may have noted in the main-script, you need a folder-structure like:

yaml-configs
├── elasticsearch
│ └── datasources.yaml
└── groups
  ├── es-queried-servers.yaml
  ├── redis-clusters.yaml
  └── frontend-servers.yaml

Elasticsearch Datasource Configuration

If you want to query Elasticsearch via this flask-app, you have to configure the datasources.yaml

In this file you must not use dashes in the key-names!

# do not use dashes in keynames here!
prod:
  es_url: "https://kibana.home.local/api/console/proxy?path=_search&method=GET"
  es_user: "user_with_permission"
  es_pw: "supersecure"
  es_proxy: True
test:
  es_url: "https://elasticsearch-host:9200/_search"
  es_user: "user_with_permission"
  es_pw: "supersecure"
  es_proxy: False

Groups - Supported YAML-Structures

You can create groups with lists up to the second level (sry, not yet implemented to support more). See first example. "Single-select" groups can have any amount of sub-levels without list.

In the group files you also can use dashes in the key-names so you can have pretty names in the drop-downs

Simple "host"-groups

Great for multi-value/include-all Grafana variables

Example frontend-servers.yaml

frontend-servers:
  hosts:
    - "SERVER-A"
    - "SERVER-B"

In this case the group-name is "frontend-servers". If you have another file e.g

Combination with an Elasticsearch aggregation-query

If you need unique values from a field you want to supply via dropdown in Grafana. This can be useful for example to get a list of unique URLs from HTTP-access logs saved in Elasticsearch

Example

es-queried-servers:
  hosts:
    - "www.steinkogler.org"
    - "some.full-qualified-name.com"
  elasticsearch:
    index_patterns: "filebeat*"  # if multiple patterns/indices should be queried, separate the patterns with a colon
    field: "url.original"        # the field where we want to get unique values from
    unique_values_number: 10     # a reasonable amount of unique values
    es_environment: "prod"       # from which Elasticsearch cluster do we want to get the values

Single select groups

Great if you don't want to support selecting multiple values in a dropdown in Grafana

Example

redis-clusters:
  AT-PROD: "at-prod-redis-haproxy"
  AT-TEST: "at-test-redis-haproxy"

How to use this in Grafana

Prerequisite installed json-datasource plugin (SimpleJson) and json-datasource 😉 - see: https://grafana.com/grafana/plugins/grafana-simple-json-datasource

The flask-app now can be queried with pseudo-JSON queries. Here are some examples for variables configured in Grafana

Name             Query                                                   Var-Options
group            {"get_data": "groups"}                                  "multi-value" and "include all option" disabled
hosts            {"get_data": "groups.$group.hosts"}                     "multi-value" and "include all option" enabled
hosts_lowercased {"get_data": "groups.$group.hosts.lowercase = $hosts"}  "multi-value" and "include all option" enabled
virtual_hosts    {"get_data": "groups.$group.elasticsearch = $hosts"}    "multi-value" and "include all option" enabled

So with this config you should see four dropdowns in your dashboard.

  • First you select the "group"
  • Then either "all" or only selected "hosts"
  • The "hosts_lowercased" should also only list the same as the normal named "hosts" variable - in normal use-cases I hide this variable and only use it in the panel-queries.
  • Elasticsearch is queried in the meantime and updating the "virutal_hosts" variable. This can take quite some time depending on your indices size etc.

With the "Regex"-field in Grafana's variable-config you can restrict of course what groups for example should be usable etc.

As the Flask-App always re-reads the yaml-files with each search query, you can add new ones without reloading the app - not sure how the performance will suffer, if you have a lot of config-files ;). If you have some unexpected errors and the message in the drop-down is not describing the error in some pretty way, you can enable debug-logging in the app (a restart is then needed). The debugging option can be enabled quite at the beginning - find the line
# for debug mode but do not forget to disable it later again.

I hope this script can be useful for someone and do not hesitate to write a comment if you have questions, suggestions, etc. - I'll try to answer asap.

Zuletzt bearbeitet: Januar 5, 2022

Autor

Kommentare

Kommentar verfassen

Diese Website verwendet Akismet, um Spam zu reduzieren. Erfahre mehr darüber, wie deine Kommentardaten verarbeitet werden.