Enrich Wazuh indices using Elasticsearch ingest Set processor


Elasticsearch offers various processors configurable within the ingest pipelines
allowing you to perform transformations over the data. In this write-up, I will use the Set processor https://www.elastic.co/guide/en/elasticsearch/reference/current/set-
processor.html
to enrich the Wazuh alerts indices.

Use case:

Add a flag/field to distinguish between servers in different data centers, knowing
that each data center has servers with unique Operating System.

Wazuh & Elasticsearch configuration:

Since Wazuh manages the Elasticsearch pipelines via a Filebeat module, the
configuration is performed at the level of Filebeat as below:

Adding a processor to enrich all Centos and Ubuntu data/documents with a field called DC assigning them respectively to data centers 1 and 2, resulting in:

{ "set": 
  {
    "description": "Assign DataCenter based on OS",
    "if": "ctx.network.name.contains('ubuntu')",
    "field": "DC",
    "value": "DataCenter 1"
  }
},
{ "set": 
  {
    "description": "Assign DataCenter based on OS",
    "if": "ctx.network.name.contains('Centos')",
    "field": "DC",
    "value": "DataCenter 2"
  }
},

Open /usr/share/filebeat/module/wazuh/alerts/ingest/pipeline.json then include the created processors :

{
  "description": "Wazuh events pipeline",
  "processors": [
    { "json" : { "field" : "message", "add_to_root": true } },
    {
      "set": {
        "field": "data.aws.region",
        "value": "{{data.aws.awsRegion}}",
        "override": false,
        "ignore_failure": true
      }
    },
    {
      "set": {
        "field": "data.aws.accountId",
        "value": "{{data.aws.aws_account_id}}",
        "override": false,
        "ignore_failure": true
      }
    },
    {
      "geoip": {
        "field": "data.srcip",
        "target_field": "GeoLocation",
        "properties": ["city_name", "country_name", "region_name", "location"],
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "geoip": {
        "field": "data.win.eventdata.ipAddress",
        "target_field": "GeoLocation",
        "properties": ["city_name", "country_name", "region_name", "location"],
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "geoip": {
        "field": "data.aws.sourceIPAddress",
        "target_field": "GeoLocation",
        "properties": ["city_name", "country_name", "region_name", "location"],
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "geoip": {
        "field": "data.aws.client_ip",
        "target_field": "GeoLocation",
        "properties": ["city_name", "country_name", "region_name", "location"],
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "geoip": {
        "field": "data.aws.service.action.networkConnectionAction.remoteIpDetails.ipAddressV4",
        "target_field": "GeoLocation",
        "properties": ["city_name", "country_name", "region_name", "location"],
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "geoip": {
        "field": "data.gcp.jsonPayload.sourceIP",
        "target_field": "GeoLocation",
        "properties": ["city_name", "country_name", "region_name", "location"],
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "geoip": {
        "field": "data.office365.ClientIP",
        "target_field": "GeoLocation",
        "properties": ["city_name", "country_name", "region_name", "location"],
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    { 
       "set": {
        "description": "Assign DataCenter based on OS",
        "if": "ctx.network.name.contains('ubuntu')",
        "field": "DC",
        "value": "DataCenter 1"
      }
    },
    { 
      "set": {
       "description": "Assign DataCenter based on OS",
        "if": "ctx.network.name.contains('Centos')",
        "field": "DC",
        "value": "DataCenter 2"
      }
    },
    {
      "date": {
        "field": "timestamp",
        "target_field": "@timestamp",
        "formats": ["ISO8601"],
        "ignore_failure": false
      }
    },
    {
      "date_index_name": {
        "field": "timestamp",
        "date_rounding": "d",
        "index_name_prefix": "{{fields.index_prefix}}",
        "index_name_format": "yyyy.MM.dd",
        "ignore_failure": false
      }
    },
    { "remove": { "field": "message", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "ecs", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "beat", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "input_type", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "tags", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "count", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "@version", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "log", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "offset", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "type", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "host", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "fields", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "event", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "fileset", "ignore_missing": true, "ignore_failure": true } },
    { "remove": { "field": "service", "ignore_missing": true, "ignore_failure": true } }
  ],
  "on_failure" : [{
    "drop" : { }
  }]
}

Reload the pipeline then restart Filebeat:

filebeat setup --pipelines
systemctl restart filebeat

Navigate to the Wazuh UI to refresh the index pattern making the new field
searchable and aggregable:

Results:

A simple dashboard (https://github.com/elwali10/Kibana-dashboards) showing the new field DC with the added data


I hope you find it useful 😀

,

Leave a Reply

Your email address will not be published. Required fields are marked *