Showing posts with label Logstash. Show all posts

In the previous post, we have setup ELK stack and ran data analytics on application events and logs. In this post, we will discuss how you can watch real-time application events that are being persisted in the Elasticsearch index and raise alerts if condition for watcher is breached using SentiNL (Kibana plugin).

Few examples of alerting for application events (see previous posts) are:

  • Same user logged in from different IP addresses.
  • Different users logged in from same IP address.
  • PermissionFailures in last 15 minutes.
  • Particular kind of exception in last 15 minutes/ hour/ day.

Watching and alerting on Elasticsearch index in Kibana

There are many plugins available for watching and alerting on Elasticsearch index in Kibana e.g. X-Pack, SentiNL.

X-Pack is a paid extension provided by elastic.co which provides security, alerting, monitoring, reporting and graph capabilities.

SentiNL is free extension provided by siren.io which provides alerting and reporting functionality to monitor, notify and report changes in elasticsearch index using standard queries, programmable validators and configurable actions.

We will be using SentiNL for watching and alerting on Elasticsearch index.

Installing SentiNL

Prerequisite

For debian, we need libfontconfig and libfreetype6 libraries, if not installed already.

sudo apt-get install libfontconfig libfreetype6

For centos, we need fontconfig and freetype libraries, if not installed already.

sudo yum install fontconfig freetype

// Installing SentiNL plugin
/opt/kibana/bin/kibana-plugin --install sentinl -u https://github.com/sirensolutions/sentinl/releases/download/tag-4.6.4-4/sentinl.zip

Configuring SentiNL

SentiNL have wide range of actions that you can configure for watchers. You can send an email, integrate with Slack channel or pushapps, send payload to custom webhook. Open kibana.yml file and add below properties for SentiNL. For our example, we will only enable notification through email.

sentinl:
  es:
    host: 'localhost'
    port: 9200
  settings:
    email:
      active: true
      host: "smtp.gmail.com"
      user: "[EMAIL_ID]"
      password: "[PASSWORD]"
      port: 465
      domain: "gmail.com"
      ssl: true
      tls: false
      authentication: ['PLAIN', 'LOGIN', 'CRAM-MD5', 'XOAUTH2']
      timeout: 20000  # mail server connection timeout
      # cert:
      #   key: '/full/sys/path/to/key/file'
      #   cert: '/full/sys/path/to/cert/file'
      #   ca: '/full/sys/path/to/ca/file'
    slack:
      active: false
      username: 'username'
      hook: 'https://hooks.slack.com/services/'
      channel: '#channel'
    webhook:
      active: false
      host: 'localhost'
      port: 9200
      # use_https: false
      # path: ':/{{payload.watcher_id}}'
      # body: '{{payload.watcher_id}}{payload.hits.total}}'
      # method: POST
    report:
      active: false
      executable_path: '/usr/bin/chromium' # path to Chrome v59+ or Chromium v59+
      timeout: 5000
      # authentication:
      #   enabled: true
      #   mode:
      #     searchguard: false
      #     xpack: false
      #     basic: false
      #     custom: true
      #   custom:
      #     username_input_selector: '#username'
      #     password_input_selector: '#password'
      #     login_btn_selector: '#login-btn'
      # file:
      #   pdf:
      #     format: 'A4'
      #     landscape: true
      #   screenshot:
      #     width: 1280
      #     height: 900
    pushapps:
      active: false
      api_key: ''
That's it!!! Let's start Kibana to configure watcher and alerting in SentiNL.

Creating Watchers and alerting in Kibana

We will be configuring watcher for different users logged in from same IP address and will send e-Mail alerts.

  • Open Kibana dashboard on your local machine (Url for Kibana on my local machine is http://localhost:5601).
  • Click on SentiNL option in the left nav-pane. You will see a dashboard as below. Click on the New option to create a new watcher.
  • Click on the Watcher link highlighted as below.
  • Enter watcher name and schedule in the General tab.
  • Click on Input tab and enter below mentioned query json in the body. You can also give a name to the query and save.
    {
      "search": {
        "request": {
          "index": [
            "app-events*"
          ],
          "body": {
            "query": {
              "bool": {
                "filter": [
                  {
                    "range": {
                      "@timestamp": {
                        "gte": "now-30m"
                      }
                    }
                  },
                  {
                    "query_string": {
                      "default_field": "appEvent.eventType",
                      "query": "LOGIN_SUCCESS OR LOGIN_FAILURE"
                    }
                  }
                ]
              }
            },
            "aggs": {
              "group_by_requestIP": {
                "terms": {
                  "field": "appEvent.requestIP.keyword",
                  "size": 5
                },
                "aggs": {
                  "group_by_identifier": {
                    "terms": {
                      "field": "appEvent.identifier.keyword",
                      "size": 5
                    },
                    "aggs": {
                      "get_latest": {
                        "terms": {
                          "field": "@timestamp",
                          "size": 1,
                          "order": {
                            "_key": "desc"
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
    
  • Click on Condition tab and enter below mentioned condition json in the body. You can also give a name to this condition and save.
    {
      "script": {
        "script": "var requestIPbuckets = payload.aggregations.group_by_requestIP.buckets; payload.collector = []; requestIPbuckets.filter(function(requestIP) { return requestIP.key; }).forEach(function(requestIP) { var requestIPKey = requestIP.key; var users = requestIP.group_by_identifier.buckets; if (users.length > 1) { users.filter(function(user) { return user.key; }).forEach(function(user) { payload.collector.push({ 'ip': requestIPKey, 'identifier': user.key, 'count': user.doc_count  }); }); }}); payload.collector.length > 0;"
      }
    }
    
  • Click on Action tab and select email as an action for alerting. Give title, to, from, subject and add below mentioned content in the body of email.
    Found {{payload.collector.length}} Events
    {{#payload.collector}}
    {{#.}}
    ip : {{ip}}, identifier: {{identifier}}, count: {{count}}
    {{/.}}
    {{/payload.collector}}
    
  • Save the watcher.

This watcher will run periodically based on the schedule that you have set and if the condition for breach is met, will send an email alert. The configured email looks like below.

This is how you can watch real-time changing data in Elasticsearch index and raise alerts based on the configured conditions.

In this post, we will learn how to use Elasticsearch, Logstash and Kibana for running analytics on application events and logs. Firstly, I will install all these applications on my local machine.

Installations

You can read my previous posts on how to install Elasticsearch, Logstash, Kibana and Filebeat on your local machine.

Basic configuration

I hope by now you are have installed Elasticsearch, Logstash, Kibana and Filebeat on your system. Now, Let's do few basic configurations required to be able to run analytics on application events and logs.

Elasticsearch

Open elasticsearch.yml file in [ELASTICSEARCH_INSTLLATION_DIR]/config folder and add properties to it.

cluster.name: gauravbytes-event-analyzer
node.name: node-1

Cluster name is used by Elasticsearch node to form a cluster. Node name within cluster need to be unique. We are running only single instance of Elasticsearch on our local machine. But, in production grade setup there will be master nodes, data nodes and client nodes that you will be configuring as per your requirements.

Logstash

Open logstash.yml file in [LOGSTASH_INSTALLATION_DIR]/config folder and add below properties to it.

node.name: gauravbytes-logstash
path.data: [MOUNTED_HDD_LOCATION]
config.reload.automatic: true
config.reload.interval: 30s

Creating logstash pipeline for parsing application events and logs

There are three parts in pipeline. i.e. input, filter and output. Below the pipeline conf for parsing application event and logs.

input {
    beats {
        port => "5044"
    }
}

filter {
   
    grok {
        match => {"message" => "\[%{TIMESTAMP_ISO8601:loggerTime}\] *%{LOGLEVEL:level} *%{DATA:loggerName} *- (?(.|\r|\n)*)"}
    }
 
    if ([fields][type] == "appevents") {
        json {
            source => "event"
            target => "appEvent"
        }
  
        mutate { 
            remove_field => "event"
        }

        date {
            match => [ "[appEvent][eventTime]" , "ISO8601" ]
            target => "@timestamp"
        }
  
        mutate {
            replace => { "[type]" => "app-events" }
        }
    }
    else if ([fields][type] == "businesslogs") {  
        mutate {
            replace => { "[type]" => "app-logs" }
        }
    }
 
    mutate { 
        remove_field => "message"
    }
}
output {
    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "%{type}-%{+YYYY.MM.dd}"
    }
}

In the input section, we are listening on port 5044 for beat (filebeat to send data on this port).

In the output section, we are persisting data in Elasticsearch on an index based on type and date combination.

Let's discuss the filter section in detail.

  • 1) We are using grok filter plugin to parse plain lines of text to structured data.
    grok {
        match => {"message" => "\[%{TIMESTAMP_ISO8601:loggerTime}\] *%{LOGLEVEL:level} *%{DATA:loggerName} *- (?(.|\r|\n)*)"}
    }
    
  • 2) We are using json filter plugin to the convert event field to a json object and storing it in appEvent field.
    json {
        source => "event"
        target => "appEvent"
    }
    
  • 3) We are using mutate filter plugin to the remove data we don't require.
    mutate { 
        remove_field => "event"
    }
    
    mutate { 
        remove_field => "message"
    }
    
  • 4) We are using date filter plugin to the parse the eventTime from appEvent field to ISO8601 dateformat and then replacing its value with @timestamp field..
    date {
        match => [ "[appEvent][eventTime]" , "ISO8601" ]
        target => "@timestamp"
    }
    

Filebeat

Open the file filebeat.yml in [FILEBEAT_INSTALLATION_DIR] and below configurations.

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - E:\gauravbytes-log-analyzer\logs\AppEvents.log
  fields:
    type: appevents
  
- type: log
  enabled: true
  paths:
    - E:\gauravbytes-log-analyzer\logs\GauravBytesLogs.log
  fields:
    type: businesslogs
  multiline.pattern: ^\[
  multiline.negate: true
  multiline.match: after

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3

output.logstash:
  hosts: ["localhost:5044"]

In the configurations above, we are defining two different type of filebeat prospectors; one for application events and the other for application logs. We have also defined that the output should be sent to logstash. There are many other configurations that you can do by referencing filebeat.reference.yml file in the filebeat installation directory.

Kibana

Open the kibana.yml in [KIBANA_INSTALLATION_DIR]/config folder and add below configuration to it.

elasticsearch.url: "http://localhost:9200"

We have only configured Elasticsearch url but you can change Kibana host, port, name and other ssl related configurations.

Running ELK stack and Filebeat

//running elasticsearch on windows
\bin\elasticsearch.exe

// running logstash
bin\logstash.bat -f config\gauravbytes-config.conf --config.reload.automatic

//running kibana
bin\kibana.bat

//running filebeat
filebeat.exe -e -c filebeat-test.yml -d "publish"

Creating Application Event and Log structure

I have created two classes AppEvent.java and AppLog.java which will capture information related to application events and logs. Below is the structure for both the classes.

//AppEvent.java
public class AppEvent implements BaseEvent<AppEvent> {
    public enum AppEventType {
        LOGIN_SUCCESS, LOGIN_FAILURE, DATA_READ, DATA_WRITE, ERROR;
    }

    private String identifier;
    private String hostAddress;
    private String requestIP;
    private ZonedDateTime eventTime;
    private AppEventType eventType;
    private String apiName;
    private String message;
    private Throwable throwable;
}

//AppLog.java
public class AppLog implements BaseEvent<AppLog> {
    private String apiName;
    private String message;
    private Throwable throwable;
}

Let's generate events and logs

I have created a sample application to generate dummy events and logs. You can check out the full project on github. There is a AppEventGenerator java file. Run this class with system argument -DLOG_PATH=[YOUR_LOG_DIR] to generate dummy events. If your log_path is not same as one defined in the filebeat-test.yml, then copy the log files generated by this project to the location defined in the filebeat-test.yml. You soon see the events and logs got persisted in the Elasticsearch.

Running analytics on application events and logs in Kibana dashboard

Firstly, we need to define Index pattern in Kibana to view the application events and logs. Follow step by step guide below to create Index pattern.

  • Open Kibana dashboard by opening the url (http://localhost:5601/).
  • Go to Management tab. (Left pane, last option)
  • Click on Index Patterns link.
  • You will see already created index, if any. On the left side, you will see Option to Create Index pattern. Click on it.
  • Now, define index pattern and Click next. Choose time filter field name. I choose @timestamp field for this. You can select any other timestamp field present in this Index and finally click on Create index pattern button.

Let's view Kibana dashboard

Once Index pattern is created, click on Discover tab on the left pane and select index pattern created by you in the previous steps.

You will see a beautiful GUI with a lot of options to mine the data. On the top most pane, you will see option to Auto refresh and data that you would want to fetch (Last 15 minutes, 30 minutes, 1 hour, 1 day and so on) and it will automatically refresh the dashboard.

The next lane has search box. You can further write queries to have more granular view of the data. It uses Apache Lucene's query syntax.

You can also define filters to have a more granular view of data.

This is how you can run the analytics using ELK on your application events and logs. You can also define complex custom filters, queries and create visualization dashboard. Feel free to explore Kibana's official documentation to use it to its full potential.

Logstash

Logstash is data processing pipeline which ingests the data simultaneously from multiple data sources, transform it and send it to different `stash` i.e. Elasticsearch, Redis, database, rest endpoint etc. For example; Ingesting logs files; cleaning and transforming it to machine and human readable formats.

There are three components in Logstash i.e. Inputs, Filters and Outputs

Inputs

It ingests data of any kind, shape and size. For examples: Logs, AWS metrics, Instance health metrics etc.

Filters

Logstash filters parse each event, build a structure, enrich the data in event and also transform it to desired form. For example: Enriching geo-location from IP using GEO-IP filter, Anonymize PII information from events, transforming unstructured data to structural data using GROK filters etc.

Outputs

This is the sink layer. There are many output plugins i.e. Elasticsearch, Email, Slack, Datadog, Database persistence etc.

Installing Logstash

As of writing Logstash(6.2.3) requires Java 8 to run. To check the java version run the following command

java -version

The output on my system is

java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)

If Java 8 is not installed then please download it from Oracle website and follows instruction for installation. Also, set the JAVA_HOME variable.

Installing from binaries

You can directly download the binaries from here.

Installing from package repositories

Installation with APT

//ADD PUBLIC SIGNING KEY
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -

//add https-transports
sudo apt-get install apt-transport-https

//save the repository definition
echo "deb https://artifacts.elastic.co/packages/6.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-6.x.list

//installation command
sudo apt-get update && sudo apt-get install logstash

Installation with YUM

// Download and install the public signing key
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

Add the following in a new .repo file in your /etc/yum.repos.d/ directory

[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
// Installation command
sudo yum install logstash

Docker installation

You can follow the link for docker installation.