Add a custom GeoIP field to Filebeat and ElasticSearch (2023)

As part of my project to create a Kibana dashboard to visualize my external threats, I decided I wanted to have a map view of IP address origins with geoip data. By default, Filebeat installs several dashboards that I used as inspiration and seen what's possible, so I set about imitating them.

However, it wasn't easy. I had to do that.

overall concept

The way I needed to make the data flow is shown in the flowchart below.

However, there is no Filebeat package distributed as part of pfSense. FreeBSD has one, but that would mean adding more things to my router that aren't part of the pfSense ecosystem, which would be a nuisance later. So I send the logs to an internal CentOS server that has Filebeat installed. Filebeat then has to read and analyze the firewall log.

First I had to enable the firewall to log the requests it was blocking. By default, pfSense blocks all incoming traffic but does not log it. So I had to turn that on.

Once that was done, the logs flowed from my firewall to my internal server since I had remote logging already set up.

The log files sent have the following format:

1. Januar 15:16:34 Gateway-Filterprotokoll: 5,,,1000000103,bge0.3,match,block,in,4,0x0,,64,0,0,DF,6,tcp,52,192.168.3.100,192.168.1.152 ,62078,52804,0,SA,2679530145,2021659250,65535,,mss;nop;wscale;sackOK;eol

It's all one line and has a specific format that I wanted to look at. By default, Filebeat's grok parser parses the message and renders the firewall data as follows:

system.syslog.message:7,,,1000000105,re0,coincidir,bloquear,en,6,0x50,0x00000,247,ICMPv6,58,8,2607:ae80:2::238,2001:558:6020:10b:44f6:849f:8998:2390,

Which doesn't really help me converting it to structured data.

To do this, Grok applies a parse chain that looks like this:

%{SYSLOGTIMESTAMP:system.syslog.timestamp} %{SYSLOGHOST:system.syslog.hostname} %{DATA:system.syslog.program}(?:\\[%{POSINT:system.syslog.pid}\\]) ?: %{GREEDYMULTILINE:sistema.syslog.mensaje}

Which can be found on CentOS at:

/usr/share/filebeat/module/system/syslog/ingest/pipeline.json

This parsing string applied to the record format from January 1st above results in a JSON block that looks like this:

{ "system": { "syslog": { "hostname": "gateway", "program": "filterlog", "message": "5,,,1000000103,bge0.3,match,block, in,4, 0x0,,64,0,0,DF,6,tcp,52,192.168.3.100,192.168.1.152,62078,52804,0,SA,2679530145,2021659250,65535,,mss;nop;wscale;sackOK;eol"," timestamp": "1. Januar 15:16:34" } }}

You can paste these two examples into the grok debugger in the Kibana console in the developer tools. In this case you should also add the line:

GREEDYMULTILINE (.|\n)*

In the Custom Patterns section because it is not a predefined pattern.

How it works?

Grok, I admit it's great. Match sets of regular expressions with a convenient output format.

If we break down the parse chain, we see that it consists of several parts:

%{SYSLOGTIMESTAMP:sistema.syslog.timestamp}

Look for a SYSLOGTIMESTAMP (a predefined format) followed by a space, capture it, and output it as a field called timestamp nested in a syslog block, nested in a system block. In this case "1. January 15:16:34".

%{SYSLOGHOST:system.syslog.hostname}

Look for a SYSLOGHOST (basically a word) followed by a space and output that field as a nested hostname in a syslog block nested in a system block. In this case 'Gateway'

%{DATA:sistema.syslog.programa}:(?:\\[%{POSINT:sistema.syslog.pid}\\])?:

Look for a DATA (word) followed by a colon, optionally followed by a POSINT and another colon. This would generate a PID if present and generate it as above.

%{GREEDYMULTILINE:system.syslog.message}

Finally, collect the rest of the data.

field definition

The fields.yml file in /etc/filebeat/fields.yml contains all the field definitions that are sent to Elasticsearch.BeforeThe first index is created. Otherwise, Elasticsearch will infer the field types when reading the first document, and you won't be able to easily change them afterwards.

If you look in fields.yml you will see all the output fields used previously. The following command shows what is being sent to Elasticsearch:

Filebeat export template

I'm pretty sure that when Filebeat runs its configure step, it sends the cached copy of the JSON from the saved fields.yml file

/usr/share/filebeat/kibana/5/index-pattern/filebeat.json

Or generate a new one from fields.yml. No matter what I did to fields.yml, it didn't happen until I did the following (usually the followingIt isjIt is):

  1. Set these two parameters in filebeat.yml:
setup.template.name: „filebeat“ setup.template.fields: „fields.yml“
  1. I removed all my indexes that used the Filebeat template in elastic search from the Kibana DevTools console:
DELETE _template/filebeat
  1. And I ran this on my Filebeat server:
File Heartbeat Settings--Presentation

Since Elasticsearch needs to add the GeoIP fieldBeforeindexing, you have to do it in a very specific order. Otherwise you won't get a geoip field, but two subfields called lat and lon.

Find out your fields

To figure out my fields, I put my firewall's log file entry into the Kibana Grok debugger and played around until I got what I wanted:

1. Januar 15:16:34 Gateway-Filterprotokoll: 5,,,1000000103,bge0.3,match,block,in,4,0x0,,64,0,0,DF,6,tcp,52,192.168.3.100,192.168.1.152 ,62078,52804,0,SA,2679530145,2021659250,65535,,mss;nop;wscale;sackOK;eol

End up as a parsing string like:

%{SYSLOGTIMESTAMP:system.syslog.timestamp} Gateway-Filterprotokoll%{GREEDYDATA:system.firewall.sequence},,,%{NUMBER:system.firewall.sequence2},%{BACULA_DEVICE:system.firewall.interface},match,block ,de,%{NUMBER:system.firewall.data_1},%{WORD:system.firewall.data_2},,%{WORD:system.firewall.data_3},%{WORD:system.firewall.data_4},%{ WORD:system.firewall.data_5},%{WORD:system.firewall.data_6},%{WORD:system.firewall.data_7},%{WORD:system.firewall.protocol},%{WORD:system.firewall. data_8},%{IP:system.firewall.source_ip},%{IP:system.firewall.dest_ip},%{NUMBER:system.firewall.source_port},%{NUMBER:system.firewall.dest_port},%{GREEDYMULTILINE :sistema.syslog.mensaje}

I wasn't sure how to skip fields that were variables and I didn't want to write complex regular expressions so I stored them in fields called data_x and mixed a lot of them because I just wanted the IP and port. Data. This could definitely be done better. A helpful resource for that was this oneGrok-Constructor, which helps with the weird built-in patterns. That's why you see "BACULA_DEVICE" as one of my patterns. Even after several days of working on it, I didn't want to mess with what I was working on.

I added this to my /usr/share/filebeat/module/system/syslog/ingest/pipeline.json file as the first pattern in the "patterns" array. Then, to enable geoip processing, I added another entry in the Processors section to do this:

{ "geoip": { "campo": "system.firewall.source_ip", "target_field": "system.firewall.fw-geoip", "ignore_failure": verdadero }}

This reads from the system.firewall.source_ip field and inserts the geocoding data into system.firewall.fw-geoip. This is a full field object with several subfields.

Here it is important to know that the pipeline.json file is injectedde Elasticsearch. You can see it in the Kibana console with:

obtener _ingest/pipeline/filebeat-6.5.4-system-syslog-pipeline

Once you're done and you've determined your fields, it's time to edit the fields.yml

Campos.yml

For that, I strongly recommend a yaml parser, such ashttp://www.yamllint.com/because I did it wronga lot.

I added the following to my fields.yml file (the following snippet starts with the system key entry for reference, and my "firewall" block is inserted just before the "auth" block. (the bold section is already in the file) :

- Key: System title: "System" Description: > Module for analyzing system log files. short_config: true Fields: - Name: System type: Group Description: > Fields from system log files. Fields:- Name: Firewall Type: Group Fields: - Name: Stream Type: Keyword - Name: Stream2 Type: Keyword - Name: Interface Type: Keyword - Name: Datatype_1: Keyword - Name: Datatype_2: Keyword ... - Name: Datatype_8 Type : Keyword - Name: Source IP Type: IP - Name: Destination IP Type: IP - Name: Source Port Type: Long - Name: Destination Port Type: Long - Name: fw- GeoIP Type: Group Description: > Contains GeoIP information collected based on the ` system.auth.ip` field. Only present if the GeoIP Elasticsearch plugin is available and used. Fields: - Name: continent_name Type: Keyword Description: > The name of the continent. - name: city_name type: keyword description: > The name of the city. - Name: region name Type: keyword Description: > The name of the region. - name: iso_country_code type: keyword description: > ISO country code. - name: location type: geo_point description: > longitude and latitude. - Name: region_iso_code Type: Keyword Description: > Region ISO code.

Please note that it is formatted by hand for this item. You have to adjust the spacing. YAML is pretty unforgiving; no tabs, only spaces and very sensitive to alignment.

Once you have that saved you can test it with:

Filebeat-Exportvorlage | grep-Firewall

If this returns the first line with the "firewall" group, you can install it in Elasticsearch. Note that the nesting in YAML corresponds to the nesting in your grok parsing.

If that works, remove the existing templates and pipelines from the Kibana Dev Console:

DELETE _template/filebeat

DELETE_ingest/pipeline/filebeat-*

and install the new ones:

filebeat-Konfiguration --template

A few caveats: If you're using SSL in Kibana, your filebeat.yml (exHere):

ssl.verification_mode:none

Also make sure that all filebeats are disabled. Otherwise, ingestion may update the template before you do.

At this point you should be able to launch Filebeat and see it begin extracting log files and geocoding them as a property. You should also see in the Kibana/Administration/Index Patterns section that thesystem.firewall.fw-geoip.ubicaciónis of type geo_point. If you see a listing forsystem.firewall.fw-geoip.ubicación.latjsystem.firewall.fw-geoip.ubicación.lon, it did not work.

File Heartbeat -e

This should rebuild your pipeline and start streaming data. It has error handling so you should be able to fix it. The problems I had were bad .yml field mappings, errors in my pipeline.json, and I didn't disable other instances of Filebeat on other servers either. You can then view the correctly formatted data in Kibana and then create a map visualization for it using your new fw-geoip.location field. If you try this and don't see that field available, that didn't work either (it's not a geo_point field).

Finally, when rendering this, make sure you only filter on your output interface. By default, you get all blocks, including internal ones.

Wow. This is a very clever way, and it works very well once you get the hang of the programming paradigm. However, as a paradigm it is very complicated and not very clearly explained in my opinion. I hope it helps.

what I hear: Beastie-BoysCommittee on Hot Sauces Part 2, specifically "Make Some Noise". One of the last and one of the best.

Related

Top Articles
Latest Posts
Article information

Author: Gov. Deandrea McKenzie

Last Updated: 01/29/2023

Views: 5907

Rating: 4.6 / 5 (66 voted)

Reviews: 81% of readers found this page helpful

Author information

Name: Gov. Deandrea McKenzie

Birthday: 2001-01-17

Address: Suite 769 2454 Marsha Coves, Debbieton, MS 95002

Phone: +813077629322

Job: Real-Estate Executive

Hobby: Archery, Metal detecting, Kitesurfing, Genealogy, Kitesurfing, Calligraphy, Roller skating

Introduction: My name is Gov. Deandrea McKenzie, I am a spotless, clean, glamorous, sparkling, adventurous, nice, brainy person who loves writing and wants to share my knowledge and understanding with you.