Security

JOIN two log files from the same source path

shashankk
Communicator
Refer below SPL query which I am using to get the UserId count against the server Instance.

 

index=test_uat source=*/DB*/scn0*/tool/log/scan*log "realtime update"
| rex field=source "\/HSDB..\/(?<Instance>.*);\s"
| rex ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s"
| rex field vuser "(?<UserId>[^/]+)$"
| chart count over UserId by Instance
| addtotals lablefield=UserId
| fillnull value=0
| sort - Total

 

This gives output as below:

UserId, scn01, scn02, scn03, Total
MARVEL, 2, 0, 6, 8

 

I am having 2 source files on the same path.

source=*/DB*/scn0*/tool/log/scan.log
source=*/DB*/scn0*/tool/log/output.log

which I can combine and use as below:

 

source=*/DB*/scn0*/tool/log/*log

 

Sample event log from "output.log"
2026-01-21 08:26:45.014 user_id="2143" user_name="MARVEL" first_name="avenger" last_name="superhero" cost_center="DC"

The common field between the 2 logs is "UserId = user_name"

Based on this join, I want to show the final output as below:

UserId, first_name, last_name, cost_center, scn01, scn02, scn03, Total
MARVEL, avenger, superhero, DC, 2, 0, 6, 8

Please help with this join and suggest the updated query.

@ITWhisperer
Labels (3)
Tags (2)
0 Karma
1 Solution

livehybrid
SplunkTrust
SplunkTrust

Hi @shashankk 

I think there are a couple of ways you could do this, one being:

index=test_uat (source=*/DB*/scn0*/tool/log/scan*log "realtime update") OR source=*/DB*/scn0*/tool/log/output.log
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$"
| eval UserId=COALESCE(UserId,user_name)
| eventstats values(cost_center) as cost_center values(first_name) as first_name, values(last_name) as last_name by UserId
| stats count by Instance, UserId, first_name, last_name, cost_center | eval {Instance}=count 
| stats values(*) AS * by UserId 
| addtotals labelfield=UserId
| fields - count Instance

Another way would be to append the output.log later on, I usually try and avoid appends where possible though as they can have limitations if too many results are returned:

index=test_uat source=*/DB*/scn0*/tool/log/scan*log "realtime update"
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$" 
| chart count over UserId by Instance 
| addtotals labelfield=UserId 
| fillnull value=0 
| sort - Total 
| fields - count Instance 
| append 
    [ search index=test_uat source=*/DB*/scn0*/tool/log/output.log 
    | eval UserId=user_name 
    | table UserId first_name last_name cost_center] 
| stats values(*) as * by UserId

 

Here is an example of the second search using makeresults data to see it in action:

| makeresults format=csv data="_time,source, _raw
2026-01-21 12:00:01,\"/HSDB01/SCN01; \", \"123; /HSDB01/SCN01; realtime update:01, 111; some/other/path/MARVEL; x\"
        2026-01-21 12:00:02,\"/HSDB01/SCN01; \", \"234; /HSDB01/SCN01; realtime update:02, 222; some/other/path/MARVEL; x\"
        2026-01-21 12:00:03,\"/HSDB01/SCN03; \", \"345; /HSDB03/SCN03; realtime update:03, 333; some/other/path/MARVEL; x\"
        2026-01-21 12:00:04,\"/HSDB01/SCN03; \", \"456; /HSDB03/SCN03; realtime update:04, 444; some/other/path/MARVEL; x\"
        2026-01-21 12:00:05,\"/HSDB01/SCN03; \", \"567; /HSDB03/SCN03; realtime update:05, 555; some/other/path/MARVEL; x\"
        2026-01-21 12:00:06,\"/HSDB01/SCN03; \", \"678; /HSDB03/SCN03; realtime update:06, 666; some/other/path/MARVEL; x\"
        2026-01-21 12:00:07,\"/HSDB01/SCN03; \", \"789; /HSDB03/SCN03; realtime update:07, 777; some/other/path/MARVEL; x\"
        2026-01-21 12:00:08,\"/HSDB01/SCN03; \", \"890; /HSDB03/SCN03; realtime update:08, 888; some/other/path/MARVEL; x\""
    ```index=test_uat source=*/DB*/scn0*/tool/log/scan*log "realtime update"```
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$" 
| chart count over UserId by Instance 
| addtotals labelfield=UserId 
| fillnull value=0 
| sort - Total 
| fields - count Instance 
| append 
    [| makeresults 
    | eval _raw="2026-01-21 08:26:45.014 user_id=\"2143\" user_name=\"MARVEL\" first_name=\"avenger\" last_name=\"superhero\" cost_center=\"DC\"" 
    | extract 
    | eval UserId=user_name 
    | table UserId first_name last_name cost_center] 
| stats values(*) as * by UserId

livehybrid_0-1769033985444.png

 

🌟 Did this answer help you? If so, please consider:

  • Adding karma to show it was useful
  • Marking it as the solution if it resolved your issue
  • Commenting if you need any clarification

Your feedback encourages the volunteers in this community to continue contributing

View solution in original post

bowesmana
SplunkTrust
SplunkTrust

The way to get that final output is to create a composite field of the UserId, first_name, last_name and cost centre and then use xyseries to turn that around having used stats, not chart.

... both data sets feed into this ```
``` Gets the user info from output.log data ```
| rex "user_name=\"(?<UserId>[^\"]*)\".*first_name=\"(?<first_name>[^\"]*)\".*last_name=\"(?<last_name>[^\"]*)\".*cost_center=\"(?<cost_centre>[^\"]*)\""

``` Only count the scan.log events ```
| eval is_counted=if(match(source, "scan.log$"), 1, 0)
``` Now use stats to collect all values of these fields ```
| fields UserId first_name last_name cost_center Instance is_counted
| stats values(*) as * sum(is_counted) as count by UserId Instance

``` Create the composite key ```
| eval identity=UserId."##".first_name."##".last_name."##".cost_centre
``` Turn the table around like chart ```
| xyseries identity Instance count 
``` And now break out that composite field again ```
| rex field=identity "(?<UserId>.*)##(?<first_name>.*)##(?<last_name>.*)##(?<cost_centre>.*)"
| fields - identity

Hope this helps.

livehybrid
SplunkTrust
SplunkTrust

Hi @shashankk 

I think there are a couple of ways you could do this, one being:

index=test_uat (source=*/DB*/scn0*/tool/log/scan*log "realtime update") OR source=*/DB*/scn0*/tool/log/output.log
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$"
| eval UserId=COALESCE(UserId,user_name)
| eventstats values(cost_center) as cost_center values(first_name) as first_name, values(last_name) as last_name by UserId
| stats count by Instance, UserId, first_name, last_name, cost_center | eval {Instance}=count 
| stats values(*) AS * by UserId 
| addtotals labelfield=UserId
| fields - count Instance

Another way would be to append the output.log later on, I usually try and avoid appends where possible though as they can have limitations if too many results are returned:

index=test_uat source=*/DB*/scn0*/tool/log/scan*log "realtime update"
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$" 
| chart count over UserId by Instance 
| addtotals labelfield=UserId 
| fillnull value=0 
| sort - Total 
| fields - count Instance 
| append 
    [ search index=test_uat source=*/DB*/scn0*/tool/log/output.log 
    | eval UserId=user_name 
    | table UserId first_name last_name cost_center] 
| stats values(*) as * by UserId

 

Here is an example of the second search using makeresults data to see it in action:

| makeresults format=csv data="_time,source, _raw
2026-01-21 12:00:01,\"/HSDB01/SCN01; \", \"123; /HSDB01/SCN01; realtime update:01, 111; some/other/path/MARVEL; x\"
        2026-01-21 12:00:02,\"/HSDB01/SCN01; \", \"234; /HSDB01/SCN01; realtime update:02, 222; some/other/path/MARVEL; x\"
        2026-01-21 12:00:03,\"/HSDB01/SCN03; \", \"345; /HSDB03/SCN03; realtime update:03, 333; some/other/path/MARVEL; x\"
        2026-01-21 12:00:04,\"/HSDB01/SCN03; \", \"456; /HSDB03/SCN03; realtime update:04, 444; some/other/path/MARVEL; x\"
        2026-01-21 12:00:05,\"/HSDB01/SCN03; \", \"567; /HSDB03/SCN03; realtime update:05, 555; some/other/path/MARVEL; x\"
        2026-01-21 12:00:06,\"/HSDB01/SCN03; \", \"678; /HSDB03/SCN03; realtime update:06, 666; some/other/path/MARVEL; x\"
        2026-01-21 12:00:07,\"/HSDB01/SCN03; \", \"789; /HSDB03/SCN03; realtime update:07, 777; some/other/path/MARVEL; x\"
        2026-01-21 12:00:08,\"/HSDB01/SCN03; \", \"890; /HSDB03/SCN03; realtime update:08, 888; some/other/path/MARVEL; x\""
    ```index=test_uat source=*/DB*/scn0*/tool/log/scan*log "realtime update"```
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$" 
| chart count over UserId by Instance 
| addtotals labelfield=UserId 
| fillnull value=0 
| sort - Total 
| fields - count Instance 
| append 
    [| makeresults 
    | eval _raw="2026-01-21 08:26:45.014 user_id=\"2143\" user_name=\"MARVEL\" first_name=\"avenger\" last_name=\"superhero\" cost_center=\"DC\"" 
    | extract 
    | eval UserId=user_name 
    | table UserId first_name last_name cost_center] 
| stats values(*) as * by UserId

livehybrid_0-1769033985444.png

 

🌟 Did this answer help you? If so, please consider:

  • Adding karma to show it was useful
  • Marking it as the solution if it resolved your issue
  • Commenting if you need any clarification

Your feedback encourages the volunteers in this community to continue contributing

Got questions? Get answers!

Join the Splunk Community Slack to learn, troubleshoot, and make connections with fellow Splunk practitioners in real time!

Meet up IRL or virtually!

Join Splunk User Groups to connect and learn in-person by region or remotely by topic or industry.

Get Updates on the Splunk Community!

Announcing Modern Navigation: A New Era of Splunk User Experience

We are excited to introduce the Modern Navigation feature in the Splunk Platform, available to both cloud and ...

Observability Simplified: Combining User Experience, Application Performance & ...

Tech Talk Observability Simplified: Combining User Experience, Application Performance & Network ...

Event Series May & June: From Network Visibility to Service Intelligence

Unifying the Network: Moving from Alert Noise to Service Intelligence with Splunk ITSI In today’s hybrid ...