Get notified when Elasticsearch/Wazuh Indexer/ OpenSearch stops indexing data


Prerequisites:

Use case:

Monitor Wazuh indexer/Elasticsearch/Opensearch Indexing and get a discord notification if no events indexed for the last 5 minutes.

Monitoring Script:

#!/var/ossec/framework/python/bin/python3
import json
import requests
from requests.auth import HTTPBasicAuth
from socket import socket, AF_UNIX, SOCK_DGRAM


SOCKET_ADDR = '/var/ossec/queue/sockets/queue'
AUTH = HTTPBasicAuth('Username', 'YourWazuhIndexer/ElasticsearchPassword')
WazuhIndexer_URL = 'https://WazuhIndexer.ElasticsearchIP:9200'
#query data for the last 5 mins
QUERY = {
    "query": {
        "bool": {
            "must": [],
            "filter": [
                {"match_all": {}},
                {"range": {"timestamp": {"gte": "now-5m/m"}}}
            ]
        }
    }
}

def send_event(sock, msg):
    try:
        string = f'1:WazuhIndexer_query:{msg}'
        sock.send(string.encode())
    except Exception as e:
        print(f"Error sending event: {e}")

def fetch_hits(url, query):
    try:
        response = requests.get(url, auth=AUTH, verify=False, json=query)
        response.raise_for_status()  # Raise an error for bad status codes
        return json.loads(response.text)['hits']['total']['value']
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from Elasticsearch: {e}")
        return None

def main():
    try:
        sock = socket(AF_UNIX, SOCK_DGRAM)
        sock.connect(SOCKET_ADDR)

        # Fetch Wazuh alerts events hits
        wazuh_hits = fetch_hits(f'{WazuhIndexer_URL}/wazuh-alerts*/_search', QUERY)
        if wazuh_hits is not None:
            send_event(sock, f'All Wazuh Alerts Events indexed retruns:  {wazuh_hits} hits')

        # Fetch all events hits
        all_hits = fetch_hits(f'{WazuhIndexer_URL}/*/_search', QUERY)
        if all_hits is not None:
            send_event(sock, f'All Indexed Events returns: {all_hits} hits')

    except Exception as e:
        print(f"Error with socket operations: {e}")
    finally:
        sock.close()

if __name__ == "__main__":
    main()

Wazuh Manager Configuration:

Place the script in the Wazuh manager under the path /var/ossec/integrations assigning the following permissions and ownership:

chown :wazuh /var/ossec/integrations/monitorIndexing.py
chmod 750 /var/ossec/integrations/monitorIndexing.py

Add the configuration to run the script and the rules to trigger the alerts in your Wazuh manager:

 <wodle name="command">
  <disabled>no</disabled>
  <command>/var/ossec/integrations/monitorIndexing.py</command>
  <interval>1m</interval>
  <ignore_output>yes</ignore_output>
  <run_on_start>yes</run_on_start>
  <timeout>0</timeout>
</wodle>

<integration>
        <name>custom-discord</name>
        <group>monitoring</group>
        <hook_url>https://discord.com/api/webhooks/125309xxxx</hook_url>
        <api_key>https://192.168.121.97</api_key>
        <alert_format>json</alert_format>
</integration>
 <group name="monitoring,">
  <rule id="100302" level="8">
    <location>WazuhIndexer_query</location>
    <match>All Wazuh Alerts Events indexed retruns:  0 hits</match>
    <description>No Wazuh alerts indexed for the last 5 minutes</description>
  </rule>
  <rule id="100303" level="10">
    <location>WazuhIndexer_query</location>
    <match>All Indexed Events returns: 0 hits</match>
    <description>No events indexed for the last 5 minutes</description>
  </rule>
</group>

Make sure to restart the Wazuh manager to apply the changes: systemctl restart wazuh-manager

Result:

Hope this helps 😀


Leave a Reply

Your email address will not be published. Required fields are marked *