Elasticsearch offers various processors configurable within the ingest pipelines
allowing you to perform transformations over the data. In this write-up, I will use the Set processor https://www.elastic.co/guide/en/elasticsearch/reference/current/set-
processor.html to enrich the Wazuh alerts indices.
Use case:
Add a flag/field to distinguish between servers in different data centers, knowing
that each data center has servers with unique Operating System.
Wazuh & Elasticsearch configuration:
Since Wazuh manages the Elasticsearch pipelines via a Filebeat module, the
configuration is performed at the level of Filebeat as below:
Adding a processor to enrich all Centos and Ubuntu data/documents with a field called DC assigning them respectively to data centers 1 and 2, resulting in:
{ "set":
{
"description": "Assign DataCenter based on OS",
"if": "ctx.network.name.contains('ubuntu')",
"field": "DC",
"value": "DataCenter 1"
}
},
{ "set":
{
"description": "Assign DataCenter based on OS",
"if": "ctx.network.name.contains('Centos')",
"field": "DC",
"value": "DataCenter 2"
}
},
Open /usr/share/filebeat/module/wazuh/alerts/ingest/pipeline.json
then include the created processors :
{
"description": "Wazuh events pipeline",
"processors": [
{ "json" : { "field" : "message", "add_to_root": true } },
{
"set": {
"field": "data.aws.region",
"value": "{{data.aws.awsRegion}}",
"override": false,
"ignore_failure": true
}
},
{
"set": {
"field": "data.aws.accountId",
"value": "{{data.aws.aws_account_id}}",
"override": false,
"ignore_failure": true
}
},
{
"geoip": {
"field": "data.srcip",
"target_field": "GeoLocation",
"properties": ["city_name", "country_name", "region_name", "location"],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"geoip": {
"field": "data.win.eventdata.ipAddress",
"target_field": "GeoLocation",
"properties": ["city_name", "country_name", "region_name", "location"],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"geoip": {
"field": "data.aws.sourceIPAddress",
"target_field": "GeoLocation",
"properties": ["city_name", "country_name", "region_name", "location"],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"geoip": {
"field": "data.aws.client_ip",
"target_field": "GeoLocation",
"properties": ["city_name", "country_name", "region_name", "location"],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"geoip": {
"field": "data.aws.service.action.networkConnectionAction.remoteIpDetails.ipAddressV4",
"target_field": "GeoLocation",
"properties": ["city_name", "country_name", "region_name", "location"],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"geoip": {
"field": "data.gcp.jsonPayload.sourceIP",
"target_field": "GeoLocation",
"properties": ["city_name", "country_name", "region_name", "location"],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"geoip": {
"field": "data.office365.ClientIP",
"target_field": "GeoLocation",
"properties": ["city_name", "country_name", "region_name", "location"],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"set": {
"description": "Assign DataCenter based on OS",
"if": "ctx.network.name.contains('ubuntu')",
"field": "DC",
"value": "DataCenter 1"
}
},
{
"set": {
"description": "Assign DataCenter based on OS",
"if": "ctx.network.name.contains('Centos')",
"field": "DC",
"value": "DataCenter 2"
}
},
{
"date": {
"field": "timestamp",
"target_field": "@timestamp",
"formats": ["ISO8601"],
"ignore_failure": false
}
},
{
"date_index_name": {
"field": "timestamp",
"date_rounding": "d",
"index_name_prefix": "{{fields.index_prefix}}",
"index_name_format": "yyyy.MM.dd",
"ignore_failure": false
}
},
{ "remove": { "field": "message", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "ecs", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "beat", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "input_type", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "tags", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "count", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "@version", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "log", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "offset", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "type", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "host", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "fields", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "event", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "fileset", "ignore_missing": true, "ignore_failure": true } },
{ "remove": { "field": "service", "ignore_missing": true, "ignore_failure": true } }
],
"on_failure" : [{
"drop" : { }
}]
}
Reload the pipeline then restart Filebeat:
filebeat setup --pipelines
systemctl restart filebeat
Navigate to the Wazuh UI to refresh the index pattern making the new field
searchable and aggregable:
Results:
A simple dashboard (https://github.com/elwali10/Kibana-dashboards) showing the new field DC with the added data
I hope you find it useful 😀