Splunk Search

How to compare events from two sourcetypes in the same index without using join

t_splunk_d
Path Finder

I am trying to track file transfers from one location to another. 

Flow: Files are copied to File copy location -> Target Location

Both File copy location and Target location logs are in the same index but each has it own sourcetype.

File copy location events has logs for each file but Target location has a logs which has multiple files names.

Log format of filecopy location:

2024-12-18 17:02:50 , file_name="XYZ.csv",  file copy success 

2024-12-18 17:02:58, file_name="ABC.zip", file copy success 

2024-12-18 17:03:38, file_name="123.docx", file copy success

2024-12-18 18:06:19, file_name="143.docx", file copy success

Log format of Target Location:

2024-12-18 17:30:10 <FileTransfer status="success>

                                              <FileName>XYZ.csv</FileName>
                                             <FileName>ABC.zip</FileName>

                                             <FileName>123.docx</FileName> 

                                              </FileTransfer>

Desired result:

      File Name                  FileCopyLocation               Target Location

      XYZ.csv                  2024-12-18 17:02:50          2024-12-18 17:30:10

      ABC.zip                   2024-12-18 17:02:58          2024-12-18 17:30:10

      123.docx                2024-12-18 17:03:38          2024-12-18 17:30:10

       143.docx               2024-12-18 18:06:19            Pending

 

Since events are in the same index and more events I do not  want use join.

Labels (2)
0 Karma

yuanliu
SplunkTrust
SplunkTrust

You are correct to not wanting to use join; in fact, try not use join even if they are in different indices.  Thank you for illustrating data and desired output.  Here is an idea

sourcetype IN (CopLocation, TargetLocation)
| eval target_log = replace(_raw, "^[^<]+", "")
| spath input=target_log
| mvexpand FileTransfer.FileName
| eval FileName = coalesce(file_name, 'FileTransfer.FileName')

| chart values(_time) over FileName by sourcetype
| sort CopyLocation
| foreach *Location
    [eval <<FIELD>> = strftime(<<FIELD>>, "%F %T")]
| fillnull TargetLocation value=Pending

(Obviously I do not know your sourcetype names. So, adjust the above accordingly.)

Here is an emulation to produce the sample data you illustrated

| makeresults
| eval sourcetype = "CopyLocation", data = mvappend("2024-12-18 17:02:50, file_name=\"XYZ.csv\",  file copy success",
"2024-12-18 17:02:58, file_name=\"ABC.zip\", file copy success",
"2024-12-18 17:03:38, file_name=\"123.docx\", file copy success",
"2024-12-18 18:06:19, file_name=\"143.docx\", file copy success")
| mvexpand data
| eval _time = strptime(replace(data, ",.+", ""), "%F %T")
| rename data AS _raw
| extract
| append
    [makeresults
    | eval sourcetype = "TargetLocation", _raw = "2024-12-18 17:30:10 <FileTransfer status=\"success\">

                        <FileName>XYZ.csv</FileName>
                        <FileName>ABC.zip</FileName>

                        <FileName>123.docx</FileName> 

                        </FileTransfer>"
    | eval _time = strptime(replace(_raw, "<.+", ""), "%F %T")]
``` the above emulates
sourcetype IN (CopLocation, TargetLocation)
```

Play with it and compare with real data.

0 Karma

woodcock
Esteemed Legend

Something like this:

| makeresults
| eval host="sender"
| eval raw = mvappend("2024-12-18 17:02:50 , file_name=XYZ.csv, file copy success",
"2024-12-18 17:02:58, file_name=ABC.zip, file copy success",
"2024-12-18 17:03:38, file_name=123.docx, file copy success",
"2024-12-18 18:06:19, file_name=143.docx, file copy success")
| append [
| makeresults
| eval host="receiver"
| eval raw = "2024-12-18 17:30:10 <FileTransfer status=success>
<FileName>XYZ.csv</FileName>
<FileName>ABC.zip</FileName>
<FileName>123.docx</FileName>
</FileTransfer>"
]
| mvexpand raw
| rename raw AS _raw
| rex "^(?<_time>\S+\s+\S+)"
| eval _time = strptime(_time, "%Y-%m-%d %H:%M:%S")
| rex max_match=0 "(?ms)(\<FileName\>|file_name=)(?<FileName>.*?)(\<|,)"
| rex "(<FileTransfer status=\"?|file copy )(?<status>[^\"\>]+)"
| stats list(status) AS status_list list(host) AS host_list dc(host) AS hosts values(status) AS status BY FileName
| where hosts==1 OR status="fail*"
0 Karma

t_splunk_d
Path Finder

TargetLocation always comes up as Pending which is not correct.

Also I tried changing the for each to
foreach sourcetype. will it work?  sourcetype names are xml  and raw_text.

Please help to add TargetLocation date.

0 Karma

yuanliu
SplunkTrust
SplunkTrust
sourcetype names are xml  and raw_text.

That's why I mentioned that you need to adjust as I did not know your sourcetype names.  For this particular purpose, you can simply tweak sourcetype name to represent CopyLocation and TargetLocation - or any other name you want to use in foreach.

| eval sourcetype = if(sourcetype == "raw_text", "CopyLocation", "TargetLocation")

(Without mucking sourcetype value, you can also use "foreach raw_text xml" and get the correct results, then rename the fields.)

Here is a complete emulation

| makeresults
| eval sourcetype = "raw_text", data = mvappend("2024-12-18 17:02:50, file_name=\"XYZ.csv\",  file copy success",
"2024-12-18 17:02:58, file_name=\"ABC.zip\", file copy success",
"2024-12-18 17:03:38, file_name=\"123.docx\", file copy success",
"2024-12-18 18:06:19, file_name=\"143.docx\", file copy success")
| mvexpand data
| eval _time = strptime(replace(data, ",.+", ""), "%F %T")
| rename data AS _raw
| extract
| append
    [makeresults
    | eval sourcetype = xml, _raw = "2024-12-18 17:30:10 <FileTransfer status=\"success\">

                        <FileName>XYZ.csv</FileName>
                        <FileName>ABC.zip</FileName>

                        <FileName>123.docx</FileName> 

                        </FileTransfer>"
    | eval _time = strptime(replace(_raw, "<.+", ""), "%F %T")]
``` the above emulates
sourcetype IN (CopLocation, TargetLocation)
```
| eval sourcetype = if(sourcetype == "raw_text", "CopyLocation", "TargetLocation")
| eval target_log = replace(_raw, "^[^<]+", "")
| spath input=target_log
| mvexpand FileTransfer.FileName
| eval FileName = coalesce(file_name, 'FileTransfer.FileName')

| chart values(_time) over FileName by sourcetype
| sort CopyLocation
| foreach *Location
    [eval <<FIELD>> = strftime(<<FIELD>>, "%F %T")]
| fillnull TargetLocation value=Pending

 

0 Karma
Career Survey
First 500 qualified respondents will receive a $20 gift card! Tell us about your professional Splunk journey.

Can’t make it to .conf25? Join us online!

Get Updates on the Splunk Community!

Calling All Security Pros: Ready to Race Through Boston?

Hey Splunkers, .conf25 is heading to Boston and we’re kicking things off with something bold, competitive, and ...

Beyond Detection: How Splunk and Cisco Integrated Security Platforms Transform ...

Financial services organizations face an impossible equation: maintain 99.9% uptime for mission-critical ...

Customer success is front and center at .conf25

Hi Splunkers, If you are not able to be at .conf25 in person, you can still learn about all the latest news ...