Hi,
I want to create alert based on file received. Everyday at randomly we used to receive files.
ex. file name: file_20240613_1222_100.xml
Here I can extract Date:20240613 and CompanyId: 1222
I need create alert which should run for every 30 mins to check if any file arrived. If any file detected it should check both 'Date' and 'CompanyId' with last 30 days files received. If suppose there is any filename in last 30 days with same 'Date' and 'CompanyId' in the filename then it should trigger any email alert.
Base search:
index=wealth
| search transform-file
| search ace_message
| rex field=_raw "inputFileName: (?<inputFileName>.*?),"
| rex field=_raw "outputFileName: (?<outputFileName>.*?),"
| rex field=inputFileName "file\_\d+\_(?<CompanyId>\d+)\_"
| rex field=inputFileName "file\_(?<Date>\d+)\_"
| table inputFileName,outputFileName, CompanyId, Date
This will search for last 30 mins and see if any new file arrived , but I am not sure how to check the same fields for last 30 days filename.
Can someone help !
You could use a KVStore with fields "received_date", "file_date", and "company_id".
See https://docs.splunk.com/Documentation/SplunkCloud/latest/Knowledge/ConfigureKVstorelookups
Once your KVStore lookup is defined, you could use it like this:
index=wealth
| search transform-file
| search ace_message
| rex field=_raw "inputFileName: (?<inputFileName>.*?),"
| rex field=_raw "outputFileName: (?<outputFileName>.*?),"
| rex field=inputFileName "file\_\d+\_(?<CompanyId>\d+)\_"
| rex field=inputFileName "file\_(?<Date>\d+)\_"
| table inputFileName,outputFileName, CompanyId, Date
| lookup received_files_lookup file_date as Date, company_id as CompanyId
| where received_date>(now()-(60*60*24*30))
Your alert can trigger if this search returns any rows of data.
You will also need a corresponding mechanism to store any new files in the KVStore:
index=wealth
| search transform-file
| search ace_message
| rex field=_raw "inputFileName: (?<inputFileName>.*?),"
| rex field=inputFileName "file\_\d+\_(?<company_id>\d+)\_"
| rex field=inputFileName "file\_(?<file_date>\d+)\_"
| table company_id, file_date
| eval received_date=now()
| outputlookup received_files_lookup append=true
Hi,
Thank you so much for the suggestion.
Is it possible to achieve this by splunk search? since it is expected to be a simple alert configuration due to access limitation.
Please share if you have any suggestions with splunk query which will greatly help !
Alternatively, you could use streamstats to build a list of files to match against:
index=wealth OR index=transform-file OR index=ace_message earliest=-30m
| rex field=_raw "inputFileName: (?<inputFileName>.*?),"
| rex field=inputFileName "file\_(?<ID>\d+_\d+)\_"
| streamstats values(eval(if(now()-_time<1800,ID,NULL))) as IDS
| eval alertable=if((now()-_time>1800) AND (ID IN (IDS)),"True","False")
| table _time, ID, IDS, alertable
Oh, I see.
You could use a subsearch or a join:
index=wealth OR index=transform-file OR index=ace_message earliest=-30m
| rex field=_raw "inputFileName: (?<inputFileName>.*?),"
| rex field=inputFileName "file\_(?<ID>\d+_\d+)\_" | table ID
| join type=inner left=L right=R where L.ID=R.ID [search index=wealth OR index=transform-file OR index=ace_message earliest=-30d latest=-30m
| rex field=inputFileName "file\_(?<ID>\d+_\d+)\_" | table ID]
Thank you so much
Join worked as expected !
I would avoid the join command if possible (it has its quirks and limitations).
You might want to simply extract your date/filename extraction from the whole 30 days span. Then just classify it by time (see if it's last 30 minutes or not)
| eval period=if(now()-_time>1800,"before","now")
Aggregate over your filenames
| stats values(InputFilename) as InputFilenames values(OutputFilename) as OutputFilenames values(period) as periods by CompanyId Date
And now you can only list those that were both "now" and "before"
| where mvcount(periods)>1
(you might not need all those fields; I don't fully understand your business case but it's about comparing "now" and "then" - adjust accordingly)