Splunk Search

How to compare events from two sourcetypes in the same index without using join

t_splunk_d
Path Finder

I am trying to track file transfers from one location to another. 

Flow: Files are copied to File copy location -> Target Location

Both File copy location and Target location logs are in the same index but each has it own sourcetype.

File copy location events has logs for each file but Target location has a logs which has multiple files names.

Log format of filecopy location:

2024-12-18 17:02:50 , file_name="XYZ.csv",  file copy success 

2024-12-18 17:02:58, file_name="ABC.zip", file copy success 

2024-12-18 17:03:38, file_name="123.docx", file copy success

2024-12-18 18:06:19, file_name="143.docx", file copy success

Log format of Target Location:

2024-12-18 17:30:10 <FileTransfer status="success>

                                              <FileName>XYZ.csv</FileName>
                                             <FileName>ABC.zip</FileName>

                                             <FileName>123.docx</FileName> 

                                              </FileTransfer>

Desired result:

      File Name                  FileCopyLocation               Target Location

      XYZ.csv                  2024-12-18 17:02:50          2024-12-18 17:30:10

      ABC.zip                   2024-12-18 17:02:58          2024-12-18 17:30:10

      123.docx                2024-12-18 17:03:38          2024-12-18 17:30:10

       143.docx               2024-12-18 18:06:19            Pending

 

Since events are in the same index and more events I do not  want use join.

Labels (2)
0 Karma

yuanliu
SplunkTrust
SplunkTrust

You are correct to not wanting to use join; in fact, try not use join even if they are in different indices.  Thank you for illustrating data and desired output.  Here is an idea

sourcetype IN (CopLocation, TargetLocation)
| eval target_log = replace(_raw, "^[^<]+", "")
| spath input=target_log
| mvexpand FileTransfer.FileName
| eval FileName = coalesce(file_name, 'FileTransfer.FileName')

| chart values(_time) over FileName by sourcetype
| sort CopyLocation
| foreach *Location
    [eval <<FIELD>> = strftime(<<FIELD>>, "%F %T")]
| fillnull TargetLocation value=Pending

(Obviously I do not know your sourcetype names. So, adjust the above accordingly.)

Here is an emulation to produce the sample data you illustrated

| makeresults
| eval sourcetype = "CopyLocation", data = mvappend("2024-12-18 17:02:50, file_name=\"XYZ.csv\",  file copy success",
"2024-12-18 17:02:58, file_name=\"ABC.zip\", file copy success",
"2024-12-18 17:03:38, file_name=\"123.docx\", file copy success",
"2024-12-18 18:06:19, file_name=\"143.docx\", file copy success")
| mvexpand data
| eval _time = strptime(replace(data, ",.+", ""), "%F %T")
| rename data AS _raw
| extract
| append
    [makeresults
    | eval sourcetype = "TargetLocation", _raw = "2024-12-18 17:30:10 <FileTransfer status=\"success\">

                        <FileName>XYZ.csv</FileName>
                        <FileName>ABC.zip</FileName>

                        <FileName>123.docx</FileName> 

                        </FileTransfer>"
    | eval _time = strptime(replace(_raw, "<.+", ""), "%F %T")]
``` the above emulates
sourcetype IN (CopLocation, TargetLocation)
```

Play with it and compare with real data.

0 Karma

woodcock
Esteemed Legend

Something like this:

| makeresults
| eval host="sender"
| eval raw = mvappend("2024-12-18 17:02:50 , file_name=XYZ.csv, file copy success",
"2024-12-18 17:02:58, file_name=ABC.zip, file copy success",
"2024-12-18 17:03:38, file_name=123.docx, file copy success",
"2024-12-18 18:06:19, file_name=143.docx, file copy success")
| append [
| makeresults
| eval host="receiver"
| eval raw = "2024-12-18 17:30:10 <FileTransfer status=success>
<FileName>XYZ.csv</FileName>
<FileName>ABC.zip</FileName>
<FileName>123.docx</FileName>
</FileTransfer>"
]
| mvexpand raw
| rename raw AS _raw
| rex "^(?<_time>\S+\s+\S+)"
| eval _time = strptime(_time, "%Y-%m-%d %H:%M:%S")
| rex max_match=0 "(?ms)(\<FileName\>|file_name=)(?<FileName>.*?)(\<|,)"
| rex "(<FileTransfer status=\"?|file copy )(?<status>[^\"\>]+)"
| stats list(status) AS status_list list(host) AS host_list dc(host) AS hosts values(status) AS status BY FileName
| where hosts==1 OR status="fail*"
0 Karma

t_splunk_d
Path Finder

TargetLocation always comes up as Pending which is not correct.

Also I tried changing the for each to
foreach sourcetype. will it work?  sourcetype names are xml  and raw_text.

Please help to add TargetLocation date.

0 Karma

yuanliu
SplunkTrust
SplunkTrust
sourcetype names are xml  and raw_text.

That's why I mentioned that you need to adjust as I did not know your sourcetype names.  For this particular purpose, you can simply tweak sourcetype name to represent CopyLocation and TargetLocation - or any other name you want to use in foreach.

| eval sourcetype = if(sourcetype == "raw_text", "CopyLocation", "TargetLocation")

(Without mucking sourcetype value, you can also use "foreach raw_text xml" and get the correct results, then rename the fields.)

Here is a complete emulation

| makeresults
| eval sourcetype = "raw_text", data = mvappend("2024-12-18 17:02:50, file_name=\"XYZ.csv\",  file copy success",
"2024-12-18 17:02:58, file_name=\"ABC.zip\", file copy success",
"2024-12-18 17:03:38, file_name=\"123.docx\", file copy success",
"2024-12-18 18:06:19, file_name=\"143.docx\", file copy success")
| mvexpand data
| eval _time = strptime(replace(data, ",.+", ""), "%F %T")
| rename data AS _raw
| extract
| append
    [makeresults
    | eval sourcetype = xml, _raw = "2024-12-18 17:30:10 <FileTransfer status=\"success\">

                        <FileName>XYZ.csv</FileName>
                        <FileName>ABC.zip</FileName>

                        <FileName>123.docx</FileName> 

                        </FileTransfer>"
    | eval _time = strptime(replace(_raw, "<.+", ""), "%F %T")]
``` the above emulates
sourcetype IN (CopLocation, TargetLocation)
```
| eval sourcetype = if(sourcetype == "raw_text", "CopyLocation", "TargetLocation")
| eval target_log = replace(_raw, "^[^<]+", "")
| spath input=target_log
| mvexpand FileTransfer.FileName
| eval FileName = coalesce(file_name, 'FileTransfer.FileName')

| chart values(_time) over FileName by sourcetype
| sort CopyLocation
| foreach *Location
    [eval <<FIELD>> = strftime(<<FIELD>>, "%F %T")]
| fillnull TargetLocation value=Pending

 

0 Karma
Get Updates on the Splunk Community!

Accelerating Observability as Code with the Splunk AI Assistant

We’ve seen in previous posts what Observability as Code (OaC) is and how it’s now essential for managing ...

Integrating Splunk Search API and Quarto to Create Reproducible Investigation ...

 Splunk is More Than Just the Web Console For Digital Forensics and Incident Response (DFIR) practitioners, ...

Congratulations to the 2025-2026 SplunkTrust!

Hello, Splunk Community! We are beyond thrilled to announce our newest group of SplunkTrust members!  The ...