Splunk Search

How to compare events from two sourcetypes in the same index without using join

t_splunk_d
Path Finder

I am trying to track file transfers from one location to another. 

Flow: Files are copied to File copy location -> Target Location

Both File copy location and Target location logs are in the same index but each has it own sourcetype.

File copy location events has logs for each file but Target location has a logs which has multiple files names.

Log format of filecopy location:

2024-12-18 17:02:50 , file_name="XYZ.csv",  file copy success 

2024-12-18 17:02:58, file_name="ABC.zip", file copy success 

2024-12-18 17:03:38, file_name="123.docx", file copy success

2024-12-18 18:06:19, file_name="143.docx", file copy success

Log format of Target Location:

2024-12-18 17:30:10 <FileTransfer status="success>

                                              <FileName>XYZ.csv</FileName>
                                             <FileName>ABC.zip</FileName>

                                             <FileName>123.docx</FileName> 

                                              </FileTransfer>

Desired result:

      File Name                  FileCopyLocation               Target Location

      XYZ.csv                  2024-12-18 17:02:50          2024-12-18 17:30:10

      ABC.zip                   2024-12-18 17:02:58          2024-12-18 17:30:10

      123.docx                2024-12-18 17:03:38          2024-12-18 17:30:10

       143.docx               2024-12-18 18:06:19            Pending

 

Since events are in the same index and more events I do not  want use join.

Labels (2)
0 Karma

yuanliu
SplunkTrust
SplunkTrust

You are correct to not wanting to use join; in fact, try not use join even if they are in different indices.  Thank you for illustrating data and desired output.  Here is an idea

sourcetype IN (CopLocation, TargetLocation)
| eval target_log = replace(_raw, "^[^<]+", "")
| spath input=target_log
| mvexpand FileTransfer.FileName
| eval FileName = coalesce(file_name, 'FileTransfer.FileName')

| chart values(_time) over FileName by sourcetype
| sort CopyLocation
| foreach *Location
    [eval <<FIELD>> = strftime(<<FIELD>>, "%F %T")]
| fillnull TargetLocation value=Pending

(Obviously I do not know your sourcetype names. So, adjust the above accordingly.)

Here is an emulation to produce the sample data you illustrated

| makeresults
| eval sourcetype = "CopyLocation", data = mvappend("2024-12-18 17:02:50, file_name=\"XYZ.csv\",  file copy success",
"2024-12-18 17:02:58, file_name=\"ABC.zip\", file copy success",
"2024-12-18 17:03:38, file_name=\"123.docx\", file copy success",
"2024-12-18 18:06:19, file_name=\"143.docx\", file copy success")
| mvexpand data
| eval _time = strptime(replace(data, ",.+", ""), "%F %T")
| rename data AS _raw
| extract
| append
    [makeresults
    | eval sourcetype = "TargetLocation", _raw = "2024-12-18 17:30:10 <FileTransfer status=\"success\">

                        <FileName>XYZ.csv</FileName>
                        <FileName>ABC.zip</FileName>

                        <FileName>123.docx</FileName> 

                        </FileTransfer>"
    | eval _time = strptime(replace(_raw, "<.+", ""), "%F %T")]
``` the above emulates
sourcetype IN (CopLocation, TargetLocation)
```

Play with it and compare with real data.

0 Karma

woodcock
Esteemed Legend

Something like this:

| makeresults
| eval host="sender"
| eval raw = mvappend("2024-12-18 17:02:50 , file_name=XYZ.csv, file copy success",
"2024-12-18 17:02:58, file_name=ABC.zip, file copy success",
"2024-12-18 17:03:38, file_name=123.docx, file copy success",
"2024-12-18 18:06:19, file_name=143.docx, file copy success")
| append [
| makeresults
| eval host="receiver"
| eval raw = "2024-12-18 17:30:10 <FileTransfer status=success>
<FileName>XYZ.csv</FileName>
<FileName>ABC.zip</FileName>
<FileName>123.docx</FileName>
</FileTransfer>"
]
| mvexpand raw
| rename raw AS _raw
| rex "^(?<_time>\S+\s+\S+)"
| eval _time = strptime(_time, "%Y-%m-%d %H:%M:%S")
| rex max_match=0 "(?ms)(\<FileName\>|file_name=)(?<FileName>.*?)(\<|,)"
| rex "(<FileTransfer status=\"?|file copy )(?<status>[^\"\>]+)"
| stats list(status) AS status_list list(host) AS host_list dc(host) AS hosts values(status) AS status BY FileName
| where hosts==1 OR status="fail*"
0 Karma

t_splunk_d
Path Finder

TargetLocation always comes up as Pending which is not correct.

Also I tried changing the for each to
foreach sourcetype. will it work?  sourcetype names are xml  and raw_text.

Please help to add TargetLocation date.

0 Karma

yuanliu
SplunkTrust
SplunkTrust
sourcetype names are xml  and raw_text.

That's why I mentioned that you need to adjust as I did not know your sourcetype names.  For this particular purpose, you can simply tweak sourcetype name to represent CopyLocation and TargetLocation - or any other name you want to use in foreach.

| eval sourcetype = if(sourcetype == "raw_text", "CopyLocation", "TargetLocation")

(Without mucking sourcetype value, you can also use "foreach raw_text xml" and get the correct results, then rename the fields.)

Here is a complete emulation

| makeresults
| eval sourcetype = "raw_text", data = mvappend("2024-12-18 17:02:50, file_name=\"XYZ.csv\",  file copy success",
"2024-12-18 17:02:58, file_name=\"ABC.zip\", file copy success",
"2024-12-18 17:03:38, file_name=\"123.docx\", file copy success",
"2024-12-18 18:06:19, file_name=\"143.docx\", file copy success")
| mvexpand data
| eval _time = strptime(replace(data, ",.+", ""), "%F %T")
| rename data AS _raw
| extract
| append
    [makeresults
    | eval sourcetype = xml, _raw = "2024-12-18 17:30:10 <FileTransfer status=\"success\">

                        <FileName>XYZ.csv</FileName>
                        <FileName>ABC.zip</FileName>

                        <FileName>123.docx</FileName> 

                        </FileTransfer>"
    | eval _time = strptime(replace(_raw, "<.+", ""), "%F %T")]
``` the above emulates
sourcetype IN (CopLocation, TargetLocation)
```
| eval sourcetype = if(sourcetype == "raw_text", "CopyLocation", "TargetLocation")
| eval target_log = replace(_raw, "^[^<]+", "")
| spath input=target_log
| mvexpand FileTransfer.FileName
| eval FileName = coalesce(file_name, 'FileTransfer.FileName')

| chart values(_time) over FileName by sourcetype
| sort CopyLocation
| foreach *Location
    [eval <<FIELD>> = strftime(<<FIELD>>, "%F %T")]
| fillnull TargetLocation value=Pending

 

0 Karma
Career Survey
First 500 qualified respondents will receive a $20 gift card! Tell us about your professional Splunk journey.
Get Updates on the Splunk Community!

.conf25 Global Broadcast: Don’t Miss a Moment

Hello Splunkers, .conf25 is only a click away.  Not able to make it to .conf25 in person? No worries, you can ...

Observe and Secure All Apps with Splunk

 Join Us for Our Next Tech Talk: Observe and Secure All Apps with SplunkAs organizations continue to innovate ...

What's New in Splunk Observability - August 2025

What's New We are excited to announce the latest enhancements to Splunk Observability Cloud as well as what is ...