I was tasked with the job to monitor our endless amounts of sourcetypes and sourcetypes per host to be alerted when one went missing for an extended period of time. Does anyone have a simple query that they use that does this, or does anyone have a technique for keeping track of all the combinations or sourcetype & host.
Thanks in advance.
The deeper problem here is "how to define the complete expected list of sourcetypes and hosts".
For example, you could run a search over all time and report "what sourcetype/host combinations have been seen at any time in the past, but not at any time in the last 24 hours." But that isn't an efficient report, and it could miss a new host that has never reported its sourcetypes as desired.
So I see two main solutions:
Brute force: Search over all time -
index=* | eval recent=if(_time>(now()-86400),1,0) | stats count(recent==1) as CurrentCount count(recent==0) as HistoricalCount by sourcetype host | eval status=case(CurrentCount > 0 and HistoricalCount > 0,"okay", CurrentCount < 1 and HistoricalCount > 0,"MISSING", CurrentCount > 0,"New") | table sourcetype host status CurrentCount HistoricalCount
You could even put a command like
| where HistoricalCount > 0 if you don't want to report on the whole list.
Much faster: Use a lookup table -
Set up a lookup table. (You could even create the initial lookup table by running a search and saving the output as a csv file.)
CSV file example:
Now your search can simply run over the last 24 hours (or two hours or whatever) and find anything that should have reported in, but has not:
| inputlookup yourlookup | eval count = 0 | join type=outer sourcetype host [ search index=* | stats count by sourcetype host ] | where count == 0 | table sourcetype host
This will list only the sourcetype and host combinations that have not appeared in the last timeframe. It should run very quickly compared to the first search. Leave out the
| where command if you want to see the entire list. Remember to keep your lookup table up-to-date. Whenever you update inputs.conf on a host, check to make certain whether you also need to edit your lookup table!
My personal method (link). It is very similar to the second thing lguinn posted. 2 step process where you build a lookup as step 1 and then run a second query over the lookup.
The gist is: query | get current time | append to csv | take max time for each host/st pair | discard logs that haven't logged for X number of days | output lookup. Then have a second query to look for logs with a "last_seen" greater than however many hours. I have the first query run twice as often as I alert (4hrs / 8hrs). One of the main reasons I did this is if someone decommissions a server I can remove the host entries in the lookup and the person/group doesn't get spammed every 8 hours for 4 days.
You could use metadata or metasearch but there are issues with both - not the least of which within a small period of time Splunk will stop tracking host and source if there are more than 2k combinations of host/source/sourcetype.
Enhancement post from the future 😉
Replace the second search with this
tstats search to get lighting fast search performance :
| inputlookup YourLookupHere | eval count = 0 | join type=outer sourcetype host [ | tstats count(sourcetype) AS count WHERE index=_internal OR index=* by sourcetype host ] | where count == 0 | table host sourcetype | sort host
This has been solved many times including:
Meta Woot!: https://splunkbase.splunk.com/app/2949/
Broken Hosts App for Splunk: https://splunkbase.splunk.com/app/3247/
Alerts for Splunk Admins ("ForwarderLevel" alerts): https://splunkbase.splunk.com/app/3796/
Splunk Security Essentials(https://docs.splunksecurityessentials.com/features/sse_data_availability/): https://splunkbase.splunk.com/app/3435/
Monitoring Console: https://docs.splunk.com/Documentation/Splunk/latest/DMC/Configureforwardermonitoring
Deployment Server: https://docs.splunk.com/Documentation/DepMon/latest/DeployDepMon/Troubleshootyourdeployment#Forwarde...
Here is a plug and play dashboard I am experimenting with if someone wants to try something quickly.
Data Investigation Dashboard
<input type="time" token="time" searchWhenChanged="true"> <label>Select Time Range</label> <default> <earliest>-24h@h</earliest> <latest>now</latest> </default> </input> <input type="radio" token="variance" searchWhenChanged="true"> <label>Time Differnce Variance</label> <choice value="|search dataTimeMissing=*">Seconds</choice> <choice value="|eval dataTimeMissing=dataTimeMissing/60">Minutes</choice> <choice value="|eval dataTimeMissing=dataTimeMissing/3600">Hours</choice> <choice value="|eval dataTimeMissing=dataTimeMissing/86400">Days</choice> <default>|search dataTimeMissing=*</default> <initialValue>|search dataTimeMissing=*</initialValue> </input> <panel> <html> <center> <h1>SOC Data Investigation Dashboard</h1> <p>This dashboard is used to anlayze the flow of data into Splunk and measure the cosistency of data ingestion. Please pay attention to the time difference as it can be in seconds, minutes, hours, or days.</p> </center> </html> </panel> <panel> <title>Data Investigation Dashboard</title> <table> <search> <query>|metadata type=sourcetypes index=* |eval timeNow=now() |eval dataTimeMissing=timeNow - recentTime |convert ctime(recentTime) ctime(timeNow) ctime(firstTime) |fields sourcetype totalCount firstTime recentTime timeNow dataTimeMissing |sort - dataTimeMissing $variance$</query> <earliest>$time.earliest$</earliest> <latest>$time.latest$</latest> <refresh>2m</refresh> <refreshType>delay</refreshType> </search> <option name="drilldown">none</option> <option name="refresh.display">progressbar</option> <option name="rowNumbers">true</option> <format type="color" field="dataTimeMissing"> <colorPalette type="minMidMax" maxColor="#DC4E41" minColor="#FFFFFF"></colorPalette> <scale type="minMidMax"></scale> </format> </table> </panel>
Nothing is as fast as a simple query like tstats and for users who cannot go installing the third party apps can always use the below code for reference. Also this will help you to identify the retention period of indexes along with source, sourcetype, host, etc.
| tstats earliest(time) as earliestTime latest(time) as latestTime count as eventCount where index=* by source sourcetype host index splunkserver |eval" retention period days"=round((latestTime-earliestTime)/86400,2)|convert ctime(*Time)
| dbinspect index=*
| stats count as bucketcount min(startEpoch) as earliestevent by index splunkserver
| eval earliesteventhuman = strftime(earliestevent, "%c")
Do also check out Avotrix app on splunkbase.