Security

JOIN two log files from the same source path

shashankk
Communicator
Refer below SPL query which I am using to get the UserId count against the server Instance.

 

index=test_uat source=*/DB*/scn0*/tool/log/scan*log "realtime update"
| rex field=source "\/HSDB..\/(?<Instance>.*);\s"
| rex ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s"
| rex field vuser "(?<UserId>[^/]+)$"
| chart count over UserId by Instance
| addtotals lablefield=UserId
| fillnull value=0
| sort - Total

 

This gives output as below:

UserId, scn01, scn02, scn03, Total
MARVEL, 2, 0, 6, 8

 

I am having 2 source files on the same path.

source=*/DB*/scn0*/tool/log/scan.log
source=*/DB*/scn0*/tool/log/output.log

which I can combine and use as below:

 

source=*/DB*/scn0*/tool/log/*log

 

Sample event log from "output.log"
2026-01-21 08:26:45.014 user_id="2143" user_name="MARVEL" first_name="avenger" last_name="superhero" cost_center="DC"

The common field between the 2 logs is "UserId = user_name"

Based on this join, I want to show the final output as below:

UserId, first_name, last_name, cost_center, scn01, scn02, scn03, Total
MARVEL, avenger, superhero, DC, 2, 0, 6, 8

Please help with this join and suggest the updated query.

@ITWhisperer
Labels (3)
Tags (2)
0 Karma
1 Solution

livehybrid
SplunkTrust
SplunkTrust

Hi @shashankk 

I think there are a couple of ways you could do this, one being:

index=test_uat (source=*/DB*/scn0*/tool/log/scan*log "realtime update") OR source=*/DB*/scn0*/tool/log/output.log
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$"
| eval UserId=COALESCE(UserId,user_name)
| eventstats values(cost_center) as cost_center values(first_name) as first_name, values(last_name) as last_name by UserId
| stats count by Instance, UserId, first_name, last_name, cost_center | eval {Instance}=count 
| stats values(*) AS * by UserId 
| addtotals labelfield=UserId
| fields - count Instance

Another way would be to append the output.log later on, I usually try and avoid appends where possible though as they can have limitations if too many results are returned:

index=test_uat source=*/DB*/scn0*/tool/log/scan*log "realtime update"
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$" 
| chart count over UserId by Instance 
| addtotals labelfield=UserId 
| fillnull value=0 
| sort - Total 
| fields - count Instance 
| append 
    [ search index=test_uat source=*/DB*/scn0*/tool/log/output.log 
    | eval UserId=user_name 
    | table UserId first_name last_name cost_center] 
| stats values(*) as * by UserId

 

Here is an example of the second search using makeresults data to see it in action:

| makeresults format=csv data="_time,source, _raw
2026-01-21 12:00:01,\"/HSDB01/SCN01; \", \"123; /HSDB01/SCN01; realtime update:01, 111; some/other/path/MARVEL; x\"
        2026-01-21 12:00:02,\"/HSDB01/SCN01; \", \"234; /HSDB01/SCN01; realtime update:02, 222; some/other/path/MARVEL; x\"
        2026-01-21 12:00:03,\"/HSDB01/SCN03; \", \"345; /HSDB03/SCN03; realtime update:03, 333; some/other/path/MARVEL; x\"
        2026-01-21 12:00:04,\"/HSDB01/SCN03; \", \"456; /HSDB03/SCN03; realtime update:04, 444; some/other/path/MARVEL; x\"
        2026-01-21 12:00:05,\"/HSDB01/SCN03; \", \"567; /HSDB03/SCN03; realtime update:05, 555; some/other/path/MARVEL; x\"
        2026-01-21 12:00:06,\"/HSDB01/SCN03; \", \"678; /HSDB03/SCN03; realtime update:06, 666; some/other/path/MARVEL; x\"
        2026-01-21 12:00:07,\"/HSDB01/SCN03; \", \"789; /HSDB03/SCN03; realtime update:07, 777; some/other/path/MARVEL; x\"
        2026-01-21 12:00:08,\"/HSDB01/SCN03; \", \"890; /HSDB03/SCN03; realtime update:08, 888; some/other/path/MARVEL; x\""
    ```index=test_uat source=*/DB*/scn0*/tool/log/scan*log "realtime update"```
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$" 
| chart count over UserId by Instance 
| addtotals labelfield=UserId 
| fillnull value=0 
| sort - Total 
| fields - count Instance 
| append 
    [| makeresults 
    | eval _raw="2026-01-21 08:26:45.014 user_id=\"2143\" user_name=\"MARVEL\" first_name=\"avenger\" last_name=\"superhero\" cost_center=\"DC\"" 
    | extract 
    | eval UserId=user_name 
    | table UserId first_name last_name cost_center] 
| stats values(*) as * by UserId

livehybrid_0-1769033985444.png

 

🌟 Did this answer help you? If so, please consider:

  • Adding karma to show it was useful
  • Marking it as the solution if it resolved your issue
  • Commenting if you need any clarification

Your feedback encourages the volunteers in this community to continue contributing

View solution in original post

bowesmana
SplunkTrust
SplunkTrust

The way to get that final output is to create a composite field of the UserId, first_name, last_name and cost centre and then use xyseries to turn that around having used stats, not chart.

... both data sets feed into this ```
``` Gets the user info from output.log data ```
| rex "user_name=\"(?<UserId>[^\"]*)\".*first_name=\"(?<first_name>[^\"]*)\".*last_name=\"(?<last_name>[^\"]*)\".*cost_center=\"(?<cost_centre>[^\"]*)\""

``` Only count the scan.log events ```
| eval is_counted=if(match(source, "scan.log$"), 1, 0)
``` Now use stats to collect all values of these fields ```
| fields UserId first_name last_name cost_center Instance is_counted
| stats values(*) as * sum(is_counted) as count by UserId Instance

``` Create the composite key ```
| eval identity=UserId."##".first_name."##".last_name."##".cost_centre
``` Turn the table around like chart ```
| xyseries identity Instance count 
``` And now break out that composite field again ```
| rex field=identity "(?<UserId>.*)##(?<first_name>.*)##(?<last_name>.*)##(?<cost_centre>.*)"
| fields - identity

Hope this helps.

livehybrid
SplunkTrust
SplunkTrust

Hi @shashankk 

I think there are a couple of ways you could do this, one being:

index=test_uat (source=*/DB*/scn0*/tool/log/scan*log "realtime update") OR source=*/DB*/scn0*/tool/log/output.log
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$"
| eval UserId=COALESCE(UserId,user_name)
| eventstats values(cost_center) as cost_center values(first_name) as first_name, values(last_name) as last_name by UserId
| stats count by Instance, UserId, first_name, last_name, cost_center | eval {Instance}=count 
| stats values(*) AS * by UserId 
| addtotals labelfield=UserId
| fields - count Instance

Another way would be to append the output.log later on, I usually try and avoid appends where possible though as they can have limitations if too many results are returned:

index=test_uat source=*/DB*/scn0*/tool/log/scan*log "realtime update"
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$" 
| chart count over UserId by Instance 
| addtotals labelfield=UserId 
| fillnull value=0 
| sort - Total 
| fields - count Instance 
| append 
    [ search index=test_uat source=*/DB*/scn0*/tool/log/output.log 
    | eval UserId=user_name 
    | table UserId first_name last_name cost_center] 
| stats values(*) as * by UserId

 

Here is an example of the second search using makeresults data to see it in action:

| makeresults format=csv data="_time,source, _raw
2026-01-21 12:00:01,\"/HSDB01/SCN01; \", \"123; /HSDB01/SCN01; realtime update:01, 111; some/other/path/MARVEL; x\"
        2026-01-21 12:00:02,\"/HSDB01/SCN01; \", \"234; /HSDB01/SCN01; realtime update:02, 222; some/other/path/MARVEL; x\"
        2026-01-21 12:00:03,\"/HSDB01/SCN03; \", \"345; /HSDB03/SCN03; realtime update:03, 333; some/other/path/MARVEL; x\"
        2026-01-21 12:00:04,\"/HSDB01/SCN03; \", \"456; /HSDB03/SCN03; realtime update:04, 444; some/other/path/MARVEL; x\"
        2026-01-21 12:00:05,\"/HSDB01/SCN03; \", \"567; /HSDB03/SCN03; realtime update:05, 555; some/other/path/MARVEL; x\"
        2026-01-21 12:00:06,\"/HSDB01/SCN03; \", \"678; /HSDB03/SCN03; realtime update:06, 666; some/other/path/MARVEL; x\"
        2026-01-21 12:00:07,\"/HSDB01/SCN03; \", \"789; /HSDB03/SCN03; realtime update:07, 777; some/other/path/MARVEL; x\"
        2026-01-21 12:00:08,\"/HSDB01/SCN03; \", \"890; /HSDB03/SCN03; realtime update:08, 888; some/other/path/MARVEL; x\""
    ```index=test_uat source=*/DB*/scn0*/tool/log/scan*log "realtime update"```
| rex field=source "\/HSDB..\/(?<Instance>.*);\s" 
| rex field=_raw ":\d\d, \d\d\d;\s.*\/(?<vuser>.*);\s" 
| rex field=vuser "(?<UserId>[^/]+)$" 
| chart count over UserId by Instance 
| addtotals labelfield=UserId 
| fillnull value=0 
| sort - Total 
| fields - count Instance 
| append 
    [| makeresults 
    | eval _raw="2026-01-21 08:26:45.014 user_id=\"2143\" user_name=\"MARVEL\" first_name=\"avenger\" last_name=\"superhero\" cost_center=\"DC\"" 
    | extract 
    | eval UserId=user_name 
    | table UserId first_name last_name cost_center] 
| stats values(*) as * by UserId

livehybrid_0-1769033985444.png

 

🌟 Did this answer help you? If so, please consider:

  • Adding karma to show it was useful
  • Marking it as the solution if it resolved your issue
  • Commenting if you need any clarification

Your feedback encourages the volunteers in this community to continue contributing

Get Updates on the Splunk Community!

The OpenTelemetry Certified Associate (OTCA) Exam

What’s this OTCA exam? The Linux Foundation offers the OpenTelemetry Certified Associate (OTCA) credential to ...

From Manual to Agentic: Level Up Your SOC at Cisco Live

Welcome to the Era of the Agentic SOC   Are you tired of being a manual alert responder? The security ...

Splunk Classroom Chronicles: Training Tales and Testimonials (Episode 4)

Welcome back to Splunk Classroom Chronicles, our ongoing series where we shine a light on what really happens ...