Splunk Search

How to compare events from two sourcetypes in the same index without using join

t_splunk_d
Path Finder

I am trying to track file transfers from one location to another. 

Flow: Files are copied to File copy location -> Target Location

Both File copy location and Target location logs are in the same index but each has it own sourcetype.

File copy location events has logs for each file but Target location has a logs which has multiple files names.

Log format of filecopy location:

2024-12-18 17:02:50 , file_name="XYZ.csv",  file copy success 

2024-12-18 17:02:58, file_name="ABC.zip", file copy success 

2024-12-18 17:03:38, file_name="123.docx", file copy success

2024-12-18 18:06:19, file_name="143.docx", file copy success

Log format of Target Location:

2024-12-18 17:30:10 <FileTransfer status="success>

                                              <FileName>XYZ.csv</FileName>
                                             <FileName>ABC.zip</FileName>

                                             <FileName>123.docx</FileName> 

                                              </FileTransfer>

Desired result:

      File Name                  FileCopyLocation               Target Location

      XYZ.csv                  2024-12-18 17:02:50          2024-12-18 17:30:10

      ABC.zip                   2024-12-18 17:02:58          2024-12-18 17:30:10

      123.docx                2024-12-18 17:03:38          2024-12-18 17:30:10

       143.docx               2024-12-18 18:06:19            Pending

 

Since events are in the same index and more events I do not  want use join.

Labels (2)
0 Karma

yuanliu
SplunkTrust
SplunkTrust

You are correct to not wanting to use join; in fact, try not use join even if they are in different indices.  Thank you for illustrating data and desired output.  Here is an idea

sourcetype IN (CopLocation, TargetLocation)
| eval target_log = replace(_raw, "^[^<]+", "")
| spath input=target_log
| mvexpand FileTransfer.FileName
| eval FileName = coalesce(file_name, 'FileTransfer.FileName')

| chart values(_time) over FileName by sourcetype
| sort CopyLocation
| foreach *Location
    [eval <<FIELD>> = strftime(<<FIELD>>, "%F %T")]
| fillnull TargetLocation value=Pending

(Obviously I do not know your sourcetype names. So, adjust the above accordingly.)

Here is an emulation to produce the sample data you illustrated

| makeresults
| eval sourcetype = "CopyLocation", data = mvappend("2024-12-18 17:02:50, file_name=\"XYZ.csv\",  file copy success",
"2024-12-18 17:02:58, file_name=\"ABC.zip\", file copy success",
"2024-12-18 17:03:38, file_name=\"123.docx\", file copy success",
"2024-12-18 18:06:19, file_name=\"143.docx\", file copy success")
| mvexpand data
| eval _time = strptime(replace(data, ",.+", ""), "%F %T")
| rename data AS _raw
| extract
| append
    [makeresults
    | eval sourcetype = "TargetLocation", _raw = "2024-12-18 17:30:10 <FileTransfer status=\"success\">

                        <FileName>XYZ.csv</FileName>
                        <FileName>ABC.zip</FileName>

                        <FileName>123.docx</FileName> 

                        </FileTransfer>"
    | eval _time = strptime(replace(_raw, "<.+", ""), "%F %T")]
``` the above emulates
sourcetype IN (CopLocation, TargetLocation)
```

Play with it and compare with real data.

0 Karma

woodcock
Esteemed Legend

Something like this:

| makeresults
| eval host="sender"
| eval raw = mvappend("2024-12-18 17:02:50 , file_name=XYZ.csv, file copy success",
"2024-12-18 17:02:58, file_name=ABC.zip, file copy success",
"2024-12-18 17:03:38, file_name=123.docx, file copy success",
"2024-12-18 18:06:19, file_name=143.docx, file copy success")
| append [
| makeresults
| eval host="receiver"
| eval raw = "2024-12-18 17:30:10 <FileTransfer status=success>
<FileName>XYZ.csv</FileName>
<FileName>ABC.zip</FileName>
<FileName>123.docx</FileName>
</FileTransfer>"
]
| mvexpand raw
| rename raw AS _raw
| rex "^(?<_time>\S+\s+\S+)"
| eval _time = strptime(_time, "%Y-%m-%d %H:%M:%S")
| rex max_match=0 "(?ms)(\<FileName\>|file_name=)(?<FileName>.*?)(\<|,)"
| rex "(<FileTransfer status=\"?|file copy )(?<status>[^\"\>]+)"
| stats list(status) AS status_list list(host) AS host_list dc(host) AS hosts values(status) AS status BY FileName
| where hosts==1 OR status="fail*"
0 Karma

t_splunk_d
Path Finder

TargetLocation always comes up as Pending which is not correct.

Also I tried changing the for each to
foreach sourcetype. will it work?  sourcetype names are xml  and raw_text.

Please help to add TargetLocation date.

0 Karma

yuanliu
SplunkTrust
SplunkTrust
sourcetype names are xml  and raw_text.

That's why I mentioned that you need to adjust as I did not know your sourcetype names.  For this particular purpose, you can simply tweak sourcetype name to represent CopyLocation and TargetLocation - or any other name you want to use in foreach.

| eval sourcetype = if(sourcetype == "raw_text", "CopyLocation", "TargetLocation")

(Without mucking sourcetype value, you can also use "foreach raw_text xml" and get the correct results, then rename the fields.)

Here is a complete emulation

| makeresults
| eval sourcetype = "raw_text", data = mvappend("2024-12-18 17:02:50, file_name=\"XYZ.csv\",  file copy success",
"2024-12-18 17:02:58, file_name=\"ABC.zip\", file copy success",
"2024-12-18 17:03:38, file_name=\"123.docx\", file copy success",
"2024-12-18 18:06:19, file_name=\"143.docx\", file copy success")
| mvexpand data
| eval _time = strptime(replace(data, ",.+", ""), "%F %T")
| rename data AS _raw
| extract
| append
    [makeresults
    | eval sourcetype = xml, _raw = "2024-12-18 17:30:10 <FileTransfer status=\"success\">

                        <FileName>XYZ.csv</FileName>
                        <FileName>ABC.zip</FileName>

                        <FileName>123.docx</FileName> 

                        </FileTransfer>"
    | eval _time = strptime(replace(_raw, "<.+", ""), "%F %T")]
``` the above emulates
sourcetype IN (CopLocation, TargetLocation)
```
| eval sourcetype = if(sourcetype == "raw_text", "CopyLocation", "TargetLocation")
| eval target_log = replace(_raw, "^[^<]+", "")
| spath input=target_log
| mvexpand FileTransfer.FileName
| eval FileName = coalesce(file_name, 'FileTransfer.FileName')

| chart values(_time) over FileName by sourcetype
| sort CopyLocation
| foreach *Location
    [eval <<FIELD>> = strftime(<<FIELD>>, "%F %T")]
| fillnull TargetLocation value=Pending

 

0 Karma
Career Survey
First 500 qualified respondents will receive a $20 gift card! Tell us about your professional Splunk journey.

Can’t make it to .conf25? Join us online!

Get Updates on the Splunk Community!

Can’t Make It to Boston? Stream .conf25 and Learn with Haya Husain

Boston may be buzzing this September with Splunk University and .conf25, but you don’t have to pack a bag to ...

Splunk Lantern’s Guide to The Most Popular .conf25 Sessions

Splunk Lantern is a Splunk customer success center that provides advice from Splunk experts on valuable data ...

Unlock What’s Next: The Splunk Cloud Platform at .conf25

In just a few days, Boston will be buzzing as the Splunk team and thousands of community members come together ...