Splunk Search

Another method to connect two search results other than using the join command?

xnx_1012
Explorer

Hello is there another way to connect these two other than join... I have read that stats is faster than join ... is it possible that stats can connect all these together?

SPL 1

(index=* source=/var/log/secure* AND TERM(sudo) AND " root"  AND (TERM(adduser) OR TERM(chown) OR TERM(userdel) OR TERM(chmod) OR TERM(usermod) OR TERM(useradd))  AND COMMAND!="*egrep*") OR
(index="*" source=/var/log/secure* AND TERM(sshd) AND "Accepted password" AND TERM(from) AND TERM(port) 
    [ 
    search index=* source=/var/log/secure* AND TERM(sudo) AND (TERM(adduser) OR TERM(chown) OR TERM(userdel) OR TERM(chmod) OR TERM(usermod) OR TERM(useradd)) AND COMMAND!="*egrep*" 
    | regex _raw != ".*bin\/grep|.*bin\/man|.*bin\/which" 
    | regex _raw!= ".*user NOT in sudoers.*" 
    | stats latest(_time) as latest earliest(_time) as mod_time 
    | eval earliest= relative_time(mod_time, "-8h@s") 
    | fields earliest latest
    ])
| regex _raw != ".*bin\/grep|.*bin\/man|.*bin\/which|.*bin\/less|.*bin\/more"

| rex field=_raw "(?<=sudo:)\s*(?P<Users>[[:alnum:]]\S*[[:alnum:]])\s*(?=\:).*(?<=COMMAND\=)(?P<command>.*)"
| rex field=_raw "(?<=for)\s*(?P<Users>[[:alnum:]]\S*[[:alnum:]])\s*(?=from).*(?<=from)\s*(?P<ip>[[:digit:]]+\.[[:digit:]]+\.[[:digit:]]+\.[[:digit:]]+)"
| eval "Command/Events" = replace(command,"^(\/bin\/|\/sbin\/)","")
| eval Time = if(match(_raw,"(?<=sudo:)\s*[[:alnum:]]\S*[[:alnum:]]\s*(?=\:).*(?<=COMMAND\=)*") ,strftime(_time, "%Y-%d-%m %H:%M:%S"),null())
| eval Date = strftime(_time, "%Y-%d-%m")
| eval "Report ID" = "ABLR-007"
| eval "Agency HF" = if(isnull(agencyhf),"",agencyhf)
| stats list(Time) as Time list("Command/Events") as "Command/Events" latest(ip) as "IP Address" by Users Date host index "Report ID" "Agency HF"
| eval counter=mvrange(0,mvcount(Time))
| streamstats count as sessions
| stats list(*) as * by sessions counter
| foreach Time "Command/Events"  [ eval <<FIELD>> = mvindex('<<FIELD>>', counter)]
| fields - counter sessions
| rename index as Agency, host as Hostname
| where Users="root" AND isnull('IP Address')
| fields "Report ID" Time Agency Command/Events Hostname Users Date

SPL 1 Result
alt text
SPL 2

(index=* source=/var/log/secure* (TERM(su:) OR TERM(sudo:)) AND "opened for user root") OR
(index="*" source=/var/log/secure* AND TERM(sshd) AND "Accepted password" AND TERM(from) AND TERM(port))
| rex field=_raw ".*(?<=from)\s*(?P<ip>[[:digit:]]+\.[[:digit:]]+\.[[:digit:]]+\.[[:digit:]]+)"
| eval Date = strftime(_time, "%Y-%d-%m")
| rename host as Hostname
| stats values(ip) as "IP Address" by Date Hostname
| where isnotnull('IP Address')

SPL 2 Result
alt text

SPL 3 = When is SPL 1 and SPL2 consolidated together by JOIN command

(index=* source=/var/log/secure* AND TERM(sudo) AND " root"  AND (TERM(adduser) OR TERM(chown) OR TERM(userdel) OR TERM(chmod) OR TERM(usermod) OR TERM(useradd))  AND COMMAND!="*egrep*") OR
(index="*" source=/var/log/secure* AND TERM(sshd) AND "Accepted password" AND TERM(from) AND TERM(port) 
    [ 
    search index=* source=/var/log/secure* AND TERM(sudo) AND (TERM(adduser) OR TERM(chown) OR TERM(userdel) OR TERM(chmod) OR TERM(usermod) OR TERM(useradd)) AND COMMAND!="*egrep*" 
    | regex _raw != ".*bin\/grep|.*bin\/man|.*bin\/which" 
    | regex _raw!= ".*user NOT in sudoers.*" 
    | stats latest(_time) as latest earliest(_time) as mod_time 
    | eval earliest= relative_time(mod_time, "-8h@s") 
    | fields earliest latest
    ])
| regex _raw != ".*bin\/grep|.*bin\/man|.*bin\/which|.*bin\/less|.*bin\/more" 
| rex field=_raw "(?<=sudo:)\s*(?P<Users>[[:alnum:]]\S*[[:alnum:]])\s*(?=\:).*(?<=COMMAND\=)(?P<command>.*)"
| rex field=_raw "(?<=for)\s*(?P<Users>[[:alnum:]]\S*[[:alnum:]])\s*(?=from).*(?<=from)\s*(?P<ip>[[:digit:]]+\.[[:digit:]]+\.[[:digit:]]+\.[[:digit:]]+)"
| eval "Command/Events" = replace(command,"^(\/bin\/|\/sbin\/)","")
| eval Time = if(match(_raw,"(?<=sudo:)\s*[[:alnum:]]\S*[[:alnum:]]\s*(?=\:).*(?<=COMMAND\=)*") ,strftime(_time, "%Y-%d-%m %H:%M:%S"),null())
| eval Date = strftime(_time, "%Y-%d-%m")
| eval "Report ID" = "ABLR-007"
| eval "Agency HF" = if(isnull(agencyhf),"",agencyhf)
| stats list(Time) as Time list("Command/Events") as "Command/Events" latest(ip) as "IP Address" by Users Date host index "Report ID" "Agency HF"
| eval counter=mvrange(0,mvcount(Time))
| streamstats count as sessions
| stats list(*) as * by sessions counter
| foreach Time "Command/Events"  [ eval <<FIELD>> = mvindex('<<FIELD>>', counter)]
| fields - counter sessions
| rename index as Agency, host as Hostname
| where Users="root" AND isnull('IP Address')
| fields "Report ID" Time Agency Command/Events Hostname Users Date
| join left Date Hostname 'IP Address'
[search
(index=* source=/var/log/secure* (TERM(su:) OR TERM(sudo:)) AND "opened for user root") OR
(index="*" source=/var/log/secure* AND TERM(sshd) AND "Accepted password" AND TERM(from) AND TERM(port))
| rex field=_raw ".*(?<=from)\s*(?P<ip>[[:digit:]]+\.[[:digit:]]+\.[[:digit:]]+\.[[:digit:]]+)"
| eval Date = strftime(_time, "%Y-%d-%m")
| rename host as Hostname
| stats values(ip) as "IP Address" by Date Hostname
| where isnotnull('IP Address')
]
0 Karma

gjanders
SplunkTrust
SplunkTrust

skoelpin
SplunkTrust
SplunkTrust

Yes it certainly can. Few things first, I see you're doing index=* and joining data on an additional query that searches the same index=* data. Step back and look at it from a thousand foot view..

You're querying a large bucket of data via index=*. You're then filtering down results. So you're querying against the same set of data each time duplicating your efforts. Rather than pulling multiple duplicate buckets into a single query set, you should rather search only what's needed and forego the duplicate data search. So rather than filtering out results, you should use eval and create flags which can be filtered down later OR query a single large bucket of data and get everything needed than filter down.

Rather than this

(index=* source=/var/log/secure* AND TERM(sudo) AND " root"  AND (TERM(adduser) OR TERM(chown) OR TERM(userdel) OR TERM(chmod) OR TERM(usermod) OR TERM(useradd))  AND COMMAND!="*egrep*") OR
 (index="*" source=/var/log/secure* AND TERM(sshd) AND "Accepted password" AND TERM(from) AND TERM(port) [ 
     search index=* source=/var/log/secure* AND TERM(sudo) AND (TERM(adduser) OR TERM(chown) OR TERM(userdel) OR TERM(chmod) OR TERM(usermod) OR TERM(useradd)) AND COMMAND!="*egrep*" 

Use this

(index=* source=/var/log/secure* AND TERM(sudo) AND " root"  AND (TERM(adduser) OR TERM(chown) OR TERM(userdel) OR TERM(chmod) OR TERM(usermod) OR TERM(useradd))  AND COMMAND!="*egrep*" AND TERM(sshd) AND "Accepted password" AND TERM(from) AND TERM(port) )
| regex _raw != ".*bin\/grep|.*bin\/man|.*bin\/which" 
     | regex _raw!= ".*user NOT in sudoers.*" 
     | stats latest(_time) as latest earliest(_time) as mod_time 
     | eval earliest= relative_time(mod_time, "-8h@s") 
     | fields earliest latest
Career Survey
First 500 qualified respondents will receive a $20 gift card! Tell us about your professional Splunk journey.

Can’t make it to .conf25? Join us online!

Get Updates on the Splunk Community!

Can’t Make It to Boston? Stream .conf25 and Learn with Haya Husain

Boston may be buzzing this September with Splunk University and .conf25, but you don’t have to pack a bag to ...

Splunk Lantern’s Guide to The Most Popular .conf25 Sessions

Splunk Lantern is a Splunk customer success center that provides advice from Splunk experts on valuable data ...

Unlock What’s Next: The Splunk Cloud Platform at .conf25

In just a few days, Boston will be buzzing as the Splunk team and thousands of community members come together ...