In this post we will review how to use python to fetch data from ElasticSearch, and create a graph in a new PDF document.
Why do we need this? Well, sometime Kibana, which is part of the Elastic stack, cannot get the data that you need, and you want the make some multi-pass manipulations on the data, to prepare it for presentation. Another thing is that you might need to automation PDF creation ad part of a daily job, for example as an internal step before merging multiple PDF documents into a one large PDF report.
Let's start by installing the dependent libraries. We create a requirements file:
requirements.txt
elasticsearch==7.15.1
matplotlib==3.4.3
numpy==1.21.3
pandas==1.3.4
PyPDF2==1.26.0
python_dateutil==2.8.2
And install it using the command:
pip install -r requirements.txt
Now we can create a connection to the ElasticSearch. Notice that we add timeouts configuration to ensure that long running queries will not cause errors.
import datetime
import os
import elasticsearch
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from dateutil import parser
from matplotlib.backends.backend_pdf import PdfPages
from account import Account
from transaction import Transaction
es = elasticsearch.Elasticsearch(hosts=['https://my-elastic-search.example.com'],
http_auth=('elastic', 'my-password'),
request_timeout=120,
timeout=120)
What about running queries? How do we replace query parameters?
I've found the easiest way is to create a json file, that includes place holders for replacing parameters. So we use a function to get the query and replace the place holders.
def replace_in_string(data, replace_name, replace_value):
return data.replace(
'___{}___'.format(replace_name),
'{}'.format(replace_value)
)
def get_query(file_name, replaces):
file_path = 'queries/{}.json'.format(file_name)
file = open(file_path, 'r')
data = file.read()
file.close()
for name, value in replaces.items():
data = replace_in_string(data, name, value)
return data
An example of a query file is the following:
query.json
{
"sort": [
{
"timestamp": {
"order": "desc",
"unmapped_type": "boolean"
}
}
],
___SEARCH_AFTER___
"query": {
"bool": {
"must": [],
"filter": [
{
"match_all": {}
},
{
"range": {
"timestamp": {
"gte": "___FROM_TIME___",
"lte": "___TO_TIME___",
"format": "strict_date_optional_time"
}
}
},
{
"range": {
"cost": {
"gte": ___FROM_COST___,
"lt": 100
}
}
},
{
"match_phrase": {
"category": "toys"
}
}
],
"should": [],
"must_not": []
}
}
}
This query fetches payment records in a specified time range, and with some additional filters. Notice the "search after" place holder. This one will be used for pagination. Why do we need pagination? Because the ElasticSearch returns up to 10K records. After that, we need to repeatedly send a new query that asks for the next page. The following function handles the fetch including the pagination.
def es_scroll(index, query_name, replaces, page_size=10000, max_items=0):
page_number = 1
search_after = ''
fetched_items = 0
while True:
replaces['SEARCH_AFTER'] = search_after
query = get_query(query_name, replaces)
try:
page = es.search(index=index, body=query, size=page_size)
except elasticsearch.exceptions.RequestError as e:
print('query was: {}'.format(query))
raise e
hits = page['hits']['hits']
if len(hits) == 0:
break
page_number += 1
hits = page['hits']['hits']
for hit in hits:
yield hit['_source']
fetched_items += 1
if max_items > 0:
max_items -= 1
if max_items == 0:
return
search_after = '"search_after": [{}],'.format(hit['sort'][0])
Let's run the actual fetch of data. We use a function to translate python date to ElasticSearch format.
def to_elastic_iso(date):
return date.isoformat().replace("+00:00", "Z")
And we fetch the records into a list.
to_time_string = '2021-11-14T08:00:00.000Z'
to_time = parser.parse(to_time_string)
duration = datetime.timedelta(minutes=0, hours=24)
replaces = {
'FROM_TIME': to_elastic_iso(to_time-duration),
'TO_TIME': to_elastic_iso(to_time),
'FROM_COST': 50
}
headers = ['Time', 'Cost']
rows = [headers]
for item in es_scroll('transactions-*', 'query', replaces):
timestamp = item["timestamp"]
cost = item["cost"]
rows.append([timestamp,cost])
The last step is to create a time series graph in a PDF. We use the following function:
def create_graph(file_name, title, data):
headers = data[0]
time_header = headers[0]
df = pd.DataFrame(data[1:], columns=headers)
df[time_header] = pd.to_datetime(df[time_header])
df.set_index(time_header)
plot = df.plot(x=time_header, y=headers[1:])
plot.set_title(title)
figure = plot.get_figure()
figure.savefig('output/{}.pdf'.format(file_name), format='pdf')
figure.clear()
plt.close(figure)
plt.cla()
plt.clf()
Final Note
The pieces of code displayed int this post can be join forces of ElasticSearch and python to create great PDF reports. Notice that the python matplotlib can be configured to display the graphs in various presentation methods.
No comments:
Post a Comment