Hopefully others might find this helpful and I'm certainly open to feedback. Some of the guts of the solution can be traced back to the "outputlookup and State tables" preso Drew Oetzel gave at .conf2012. I'm looking at variations of this methodology in oder to track specific logging paths as needed but as a big believer in solving for the 80% and move on I'm using this for now to make sure hosts are tracked.
I have basically an internal MSSP type Splunk environment where individual units are logging their entire technology stack to indices reserved for them vs indicies being used to collect data from horizontal bands of technologies across the enterprise. We are currently running 700+ forwarders and that will likely triple shortly. Part of the use case relating back to hosts that stop logging is to send alerts to the appropriate folks in each unit vs to me(!).
The methodology consists of two parts. The first is a search that runs every 4 hours to generate a list of hosts logging w/in the context of a specific index and list the time of the most recent log. The second part is a scheduled search that runs every 8 hours (behind the second 4 hour search) which generates the actual email alert. My thought process was to have the search that identified the time of the last log run twice as often as the timeframe of my alert. If I wanted to alert if no logs were sent every hour I'd have the first search run at least every 30 minutes for example (though as you compress the time to alert you probably want the identification search to run more frequently). At any rate that ratio seemed reasonable to me.
The first search to generate the csv looks like this (note that you would need to customize each csv name)
index=blahblahblah | eval host=lower(host) | rex field=host "(?<host>^[^0-9]\S[^.]+)|(^[0-9]\S+)" | stats max(_time) AS last_seen by host | inputlookup append=T blahblahblah_hosts_list.csv | stats max(last_seen) AS last_seen by host | eval right_now = now() | eval time_diff = right_now - last_seen | where time_diff < (86400 * 3) | table host last_seen | outputlookup blahblahblah_hosts_list.csv
By the pipes..
1. limit the scope of the search to the appropriate index
2. for consistency's sake I make the host names lower case
3. I had cases where some data from a single host showed up as fully qualified and other didn't. This solves that issue
4. Grabs the most recent log. Don't know how much more efficient using head or top might be
5. Append this data to what is in the csv. Note that because you are appending this data you have the potential for multiple lines for each host
6. Of the 2 lines per host grab the one with the most recent timestamp (note the key here is if there is only one because a host stopped sending logs that will move forward
7. Figure out the current time
8. Figure out the difference between the current time and what the last timestamp is
9. Cleanup! If a host hasn't sent logs for 3 days it is assumed the host is gone. 3 days also covers weekends. Anything less than that moves forward to the next step which is
10. Prep the data to be written
11. Write the data back to csv
The second query looks like this
| inputlookup blahblahblah_hosts_list.csv | eval right_now = now() | eval time_diff = right_now - last_seen | eval hours = round(time_diff/3600) | where hours >= 8 | eval alert = "Hours since logs last seen - " .hours | table host alert hours | sort -hours | fields host alert
By the pipes..
1. Open the list (note there is nothing to the left of the first pipe)
2. Figure out what time it is
3. Figure out the time dif
4. Convert the time dif to hours because I don't like math
5. For any host w/o logs equal to or greater than 8 hours...
6. Create (ultimately) a string called alert that has the message you want to hand to the system admins
7. In order to sort the list such that the system with the longest time w/o logs sent I needed all three fields in order to..
8. Sort in descending order by the hours field
9. List the fields that will actually be in the email message.
Interesting! I had honestly somewhat forgotten about this command. That said I'm running into some wonkyness while using a simple |metadata type=hosts index=X. In one case I have an index with only two hosts logging but that search shows 250. Those other hosts are in multiple other indicies. Will open a ticket with support I suppose unless there is some other quick explanation.
This can happen when someone uses the wrong port or the forwarders sends to the wrong index. It just takes one packet from a previously unseen host for the metadata to add one more source.
Another common cause is when your reverse dns changes.
To weed out old accidents you could cut down on the time range - one week should do if you consider anything below three days of silence as okay.
I don't know what to say. I've got systems that have never once logged to index X showing up there when I do the metadata search for just index=x. Have spot checked a few using All Time /shrug. DNS isn't an issue.
Part of the use case relating back to
hosts that stop logging is to send
alerts to the appropriate folks in
each unit vs to me(!).
How did you achieve that ?
I'd imagine some combination of first building the mail content in the search, then assigning an
sendemail to=$email$ to send one email per row to a dynamic recipient.
Unfortunately you can't dynamically pass addresses to sendmail. The short version is each unit's logs are sent to its own index, each unit has their own app, and there are 2 private searches that run in the context of each app (built & alert). There are a few other options but another issue I have is host name collision. A student worker in the office came up with an external script that bounces search results against an XML file that contains the search (alert) name and email addresses for that alert for each unit. I haven't had a chance to really test that though
This may be a bit cumbersome, but you can indeed pass email addresses to sendmail:
some search generating an email field | map search="stats count | sendemail to=$email$"
That should send one mail per row, to whatever email address the row defines. Content can be passed similarly.