Splunk Search

Extract value by dynamic fields

alesyo
Engager

Hi Community,

I have the following challenge. I have different events, and for each event, I want to generate a summary with different values. These values are defined in a lookup table.

The following example:
E1: id=1 , dest_ip=1.1.1.1, src_ip=2.2.2.2,.....
E2: id=2, user=bob,  domain=microsoft
E3: id=3 county=usa, city=seattle
E4: id=4 company=cisco, product=splunk

Lookup Table: (Potential more fieldnames)

IDField1 Field2
1dest_ipsrc_ip
2userdomain
3country 
4companyproduct


Expected Output:
id1: Summary dest_ip=1.1.1.1 src_ip=2.2.2.2
Id2: Summary user=bob domain=microsoft
id3: Summary country=usa
Id4: Summary company=splunk, product =splunk

The solution could be using a case function but it doesn't scale well becuse I woult need to add a new line for each case. Potentially, the number of cases could grow to 1000.
I tried to solve with foreach, but I am unable to retrieve the values from the event.

Here's the query I tried.

 

 

 

index=events
| lookup cases.csv id OUTPUT field1, field2
| foreach field*
[ eval summary = summary + "<<field>>" + ":" <<ITEM>>
]
table id, summary

 

 

 


Thanks for your help!
Alesyo

Labels (2)
0 Karma
1 Solution

yuanliu
SplunkTrust
SplunkTrust

The key to do this is to organize fields from events into an easy-to-access format after the lookup table.  Traditionally, Splunkers use mvjoin-split action.  But for a highly variable use case like this, it is almost impossible.  You want a structured data representation.  Something like, oh, I know, JSON.

If you use Splunk 8.1 or later, I recommend this:

 

| tojson output_field=hash
| lookup cases.csv id
| foreach Field1 Field2
    [eval output = mvappend(output, '<<FIELD>>' . "=" . json_extract(hash, '<<FIELD>>'))]
| eval output = "id" . id . " Summary " . mvjoin(output, " ")
| table output hash Field1 Field2

 

This should work with any number of cases.  To illustrate the point, this comes from your sample data and sample lookup:

outputhashField1Field2
id1 Summary src_ip=2.2.2.2 dest_ip=1.1.1.1{"dest_ip":"1.1.1.1","id":1,"src_ip":"2.2.2.2"}src_ipdest_ip
id2 Summary user=bob domain=microsoft{"domain":"microsoft","id":2,"user":"bob"}userdomain
id3 Summary country=usa{"city":"seattle","country":"usa","id":3}country 
id4 Summary company=cisco product=splunk{"company":"cisco","id":4,"product":"splunk"}companyproduct

(Interestingly, if you are pre-8.1, you can replace json_extract with spath - the function, not command, and the search still works in this case.)

Here is an emulation for you to play with and compare with real data.

 

| makeresults
| eval data = split("E1: id=1 , dest_ip=1.1.1.1, src_ip=2.2.2.2,.....
E2: id=2, user=bob,  domain=microsoft
E3: id=3 country=usa, city=seattle
E4: id=4 company=cisco, product=splunk", "
")
| mvexpand data
| rename data as _raw
| extract
| fields - _*
``` data emulation above ```

 

 

View solution in original post

Tags (2)

yuanliu
SplunkTrust
SplunkTrust

The key to do this is to organize fields from events into an easy-to-access format after the lookup table.  Traditionally, Splunkers use mvjoin-split action.  But for a highly variable use case like this, it is almost impossible.  You want a structured data representation.  Something like, oh, I know, JSON.

If you use Splunk 8.1 or later, I recommend this:

 

| tojson output_field=hash
| lookup cases.csv id
| foreach Field1 Field2
    [eval output = mvappend(output, '<<FIELD>>' . "=" . json_extract(hash, '<<FIELD>>'))]
| eval output = "id" . id . " Summary " . mvjoin(output, " ")
| table output hash Field1 Field2

 

This should work with any number of cases.  To illustrate the point, this comes from your sample data and sample lookup:

outputhashField1Field2
id1 Summary src_ip=2.2.2.2 dest_ip=1.1.1.1{"dest_ip":"1.1.1.1","id":1,"src_ip":"2.2.2.2"}src_ipdest_ip
id2 Summary user=bob domain=microsoft{"domain":"microsoft","id":2,"user":"bob"}userdomain
id3 Summary country=usa{"city":"seattle","country":"usa","id":3}country 
id4 Summary company=cisco product=splunk{"company":"cisco","id":4,"product":"splunk"}companyproduct

(Interestingly, if you are pre-8.1, you can replace json_extract with spath - the function, not command, and the search still works in this case.)

Here is an emulation for you to play with and compare with real data.

 

| makeresults
| eval data = split("E1: id=1 , dest_ip=1.1.1.1, src_ip=2.2.2.2,.....
E2: id=2, user=bob,  domain=microsoft
E3: id=3 country=usa, city=seattle
E4: id=4 company=cisco, product=splunk", "
")
| mvexpand data
| rename data as _raw
| extract
| fields - _*
``` data emulation above ```

 

 

Tags (2)

alesyo
Engager

Thanks for your answer,

The events are part of an index, which aren't available as json. It is a shared notable index.
My idea is to define in a lookup which fieldnames I will extract.

For Example:

| eval sum=case(id=1, "dest_ip:" .dest_ip ",src_ip:".src_ip,
 id=2, "user:".user + ",domain:".domain
 id=3, "country:".country,
 id=4, "company:".company + ",product:".product)
| table id, sum

 

But the scalable is very worse, because the kind of condition is grow up to 1000. I think is not manageable in one use case.

Thanks for your help
Best regards
Tino

0 Karma

livehybrid
Super Champion

Hi @alesyo 

I think the JSON in my example shouldnt affect the outcome, as this was purely a way for me to provide a working example. You could use "fields" to list the fields you are interested in before running the foreach command?

index=notable .etc...
| fields id interstingField1 interestingField2 ..etc..
| foreach * 
    [| eval summary=mvappend(summary,IF(<<FIELD>>!="" and "<<FIELD>>"!="summary" and "<<FIELD>>"!="id", "<<FIELD>>=".<<FIELD>>,null()))]
    | eval summary_output="Id:".id." - ".mvjoin(summary," ")
    | fields summary_output

Please let me know how you get on and consider adding karma to this or any other answer if it has helped.
Regards

Will

0 Karma

livehybrid
Super Champion

Hi @alesyo 

How about this?

livehybrid_0-1741188071125.png

You would just need to use this on your existing query I think

 

| foreach * 
    [| eval summary=mvappend(summary,IF(<<FIELD>>!="" and "<<FIELD>>"!="id", "<<FIELD>>=".<<FIELD>>,null()))]
    | eval summary_output="Id:".id." - ".mvjoin(summary," ")
    | fields summary_output

 

 

However I've included a full working example below:

 

 

| makeresults 
| eval data="[{\"id\":1,\"dest_ip\":\"1.1.1.1\",\"src_ip\":\"2.2.2.2\"},{\"id\":2,\"user\":\"bob\",\"domain\":\"microsoft\"},{\"id\":3,\"county\":\"usa\",\"city\":\"seattle\"},{\"id\":4,\"company\":\"cisco\",\"product\":\"splunk\"}]"
| eval rawdata=json_array_to_mv(data)
| mvexpand rawdata
| eval _raw=json_extract(rawdata,"")
| fields - data rawdata
| spath
| stats values(*) AS * by id
| foreach * 
    [| eval summary=mvappend(summary,IF(<<FIELD>>!="" and "<<FIELD>>"!="id", "<<FIELD>>=".<<FIELD>>,null()))]
    | eval summary_output="Id:".id." - ".mvjoin(summary," ")
    | fields summary_output

 

 

Please let me know how you get on and consider adding karma to this or any other answer if it has helped.
Regards

Will

0 Karma

ITWhisperer
SplunkTrust
SplunkTrust
| foreach *
    [| eval summary1=if("<<FIELD>>"==Field1,<<FIELD>>,summary1)
    | eval summary2=if("<<FIELD>>"==Field2,<<FIELD>>,summary2)]
| eval summary=Field1."=".summary1.if(isnotnull(Field2)," ".Field2."=".summary2,null())
0 Karma
Get Updates on the Splunk Community!

The Latest Cisco Integrations With Splunk Platform!

Join us for an exciting tech talk where we’ll explore the latest integrations in Cisco &#43; Splunk! We’ve ...

Enterprise Security Content Update (ESCU) | New Releases

In April, the Splunk Threat Research Team had 2 releases of new security content via the Enterprise Security ...

Stay Connected: Your Guide to May Tech Talks, Office Hours, and Webinars!

What are Community Office Hours?Community Office Hours is an interactive 60-minute Zoom series where ...