Splunk Search

How to join two sourcetypes by hostname field and create a table?

3DGjos
Communicator

Hello,

I need to make a report with 2 different sourcetypes.

For the first sourcetype, lets call it st1, I have the list of people removing certain tags from hostnames in McAfee.

For the second sourcetype, st2, I have the latest tags state for each hostname, and here I've got more hostnames than in the first sourcetype.

What I need to achieve is to make a table with this information together, joined by the hostname field:

The time a tag was removed (st1), who removed it (st1), the host with the removed tag (st1), and remaining tags (st2).

I have made this search but after the join returns the values of remaining tags for the same host in every row it's not joining, and if it's joining it's joining all the hostnames from st1 with only 1 hostname from st2.

Here is my search:

 

index=epo sourcetype="mcafee:audit" ("Clear Tag" CmdName="Clear Tag") ("tag1" OR "tag2" OR "tag3" OR "tag4")
| search NOT [| inputlookup mcafee_epo_allowed_users.csv | fields UserName]
| rex field=Message "Cleared\stag\s\'(?<Tag>.+)\'\sfrom\s(?<hosts>.+)\.$"
| eval hosts2 = split(hosts,",")
| mvexpand hosts2
| rename hosts2 as "Destination host/s" CmdName as "Action" UserName as "Source User" Tag as "Cleared tag"
| fields   "Action" "Source User" "Cleared tag" "Destination host/s"
 | join   "Destination host/s" [| search index=epo sourcetype="mcafee:inventory"  | dedup NodeName | table NodeName Tags | rename NodeName as "Destination host/s", Tags as "Remaining tags" ]
 | table _time "Action" "Source User" "Cleared tag" "Destination host/s" "Remaining tags"

 


It returns this:

3DGjos_0-1597795720373.png

Field hostname in sourcetype 1 is called hosts2, in sourcetype 2 is called NodeName

If I can avoid using join its gonna be better.

Thanks.

Labels (2)
0 Karma
1 Solution

3DGjos
Communicator

I managed to solve it, without using JOIN, I just used a single basic search to bring both sourcetypes, normalized the field name, made the "join" with stats latest. and filtered the unwanted events with where and the unwanted _time using a paralel _time field named _time2, to give the table the time of ST1, which was the important _time for each row. here is my code:

index=epo sourcetype="mcafee:audit" ("Clear Tag" CmdName="Clear Tag") ("tag1" OR "tag2" OR "tag3" OR "tag4")
| rex field=Message "Cleared\stag\s\'(?<Tag>.+)\'\sfrom\s(?<hosts>.+)\.$" 
| eval hosts=if(sourcetype="mcafee:inventory", NodeName, hosts), _time2=if(sourcetype="mcafee:inventory", null(), _time), _time2=(_time2 - 10800)
| rename hosts as "Destination host/s"
| stats values(_time2) as _time last(Tags) as "Last reported tags" values(CmdName) as Action values(UserName) as "Source User" values(Tag) as "Cleared tag" by "Destination host/s"
| where isnotnull(Action)
| sort 0 - _time 
| rename _time as Date 
| fieldformat "Date"=strftime(Date, "%d-%b-%Y %H:%M:%S")
| table Date "Action" "Source User" "Destination host/s" "Cleared tag" "Last reported tags"

 

View solution in original post

0 Karma

3DGjos
Communicator

I managed to solve it, without using JOIN, I just used a single basic search to bring both sourcetypes, normalized the field name, made the "join" with stats latest. and filtered the unwanted events with where and the unwanted _time using a paralel _time field named _time2, to give the table the time of ST1, which was the important _time for each row. here is my code:

index=epo sourcetype="mcafee:audit" ("Clear Tag" CmdName="Clear Tag") ("tag1" OR "tag2" OR "tag3" OR "tag4")
| rex field=Message "Cleared\stag\s\'(?<Tag>.+)\'\sfrom\s(?<hosts>.+)\.$" 
| eval hosts=if(sourcetype="mcafee:inventory", NodeName, hosts), _time2=if(sourcetype="mcafee:inventory", null(), _time), _time2=(_time2 - 10800)
| rename hosts as "Destination host/s"
| stats values(_time2) as _time last(Tags) as "Last reported tags" values(CmdName) as Action values(UserName) as "Source User" values(Tag) as "Cleared tag" by "Destination host/s"
| where isnotnull(Action)
| sort 0 - _time 
| rename _time as Date 
| fieldformat "Date"=strftime(Date, "%d-%b-%Y %H:%M:%S")
| table Date "Action" "Source User" "Destination host/s" "Cleared tag" "Last reported tags"

 

View solution in original post

0 Karma

thambisetty
Super Champion

Join command  works well If you apply it for two small datasets or two searches which produce results from stats/timechart/chart command.

I see you are applying without using such commands, what will happen in this case there would be high number of events in both searches. Having high number of results in first search is perfectly fine, but the problem is with second search which is also called sub search. In this case join command only join first 50k results. Even search works fine, you will get partial results.

for example,

search 1 field header is,

a,b,c,d

search 2 field header is 

d,e,f

to get a,b,c,d,e,f using common field d.

SPL would be:

search 1

| join max=0 d type=left

[search 2]


The above search keeps all results from search1 and you will get e,f Only for  matching records.

————————————
If this helps, give a like below.
0 Karma

3DGjos
Communicator

Hello, tried with your syntax, but type left and max=0 brings me 100.000 results, and the columns are not matching.

here is my code:

index=epo sourcetype="mcafee:audit" ("Clear Tag" CmdName="Clear Tag") ("tag1" OR "tag2" OR "tag3" OR "tag4")
| search NOT [| inputlookup mcafee_epo_allowed_users.csv | fields UserName]
| rex field=Message "Cleared\stag\s\'(?<Tag>.+)\'\sfrom\s(?<hosts>.+)\.$"
| eval hosts2 = split(hosts,",")
| mvexpand hosts2
| rename hosts2 as "Destination host/s" CmdName as "Action" UserName as "Source User" Tag as "Cleared tag"
| fields   "Action" "Source User" "Cleared tag" "Destination host/s"
 | join type=left max=0   "Destination host/s" [| search index=epo sourcetype="mcafee:inventory"  | dedup NodeName | table NodeName Tags | rename NodeName as "Destination host/s", Tags as "Remaining tags" ]
 | table _time "Action" "Source User" "Cleared tag" "Destination host/s" "Remaining tags"


thanks

0 Karma
.conf21 CFS Extended through 5/20!

Don't miss your chance
to share your Splunk
wisdom in-person or
virtually at .conf21!

Call for Speakers has
been extended through
Thursday, 5/20!