Dashboards & Visualizations

Can a search query result that provides more than 1 field be outputted to a file with a command like outputlookup?

SecDesh
Path Finder

Good Afternoon,

TLDR;
Can a search query result that provides more than 1 field be outputted to a file with a command like outputlookup and have its multiple fields compared against for later usage? If so, how?
How to create an optimal dashboard that identifies new domains via dns queries whether utilizing the .csv file or another way? 

I am attempting to make a dashboard that will display newly-observed/newly-registered domains. From what I believe to be the most efficient method (and please feel free to correct me or provide an alternate solution), I need to make a search query that will establish a baseline and output it to a .csv file. Then create a second query that will actually create the dashboard that compares new results to that .csv file. Here's what I have so far:

Step 1 - Create the .csv file

index=nsm tag=dns query=* message_type=QUERY src_ip="10.20.30.*"
| dedup query
| stats earliest(_time) as FirstAppearance count by src_ip
| fieldformat FirstAppearance=strftime(FirstAppearance, "%x %X")

This current query produces this output:

src_ip                 FirstAppearance      count
10.20.30.40     01/01/2001              782

What I want it to produce for the .csv file is the src_ip and the associated queries that go along with it. Example:

src_ip                 query
10.20.30.40     www<.>google<.>com
                              www<.>youtube<.>com
10.20.30.41      www<.>news<.>com


Step 2 - Create the dashboard that will compare new results searched to the .csv file
Once the dashboard is created, I know I'll have to include the command inputlookup which will look into the .csv created. My question is how do I make that comparison, and how do I create the query in a way that will display accurately in the Dashboard?

Any information would greatly be appreciated.

Labels (1)
Tags (2)
0 Karma
1 Solution

PickleRick
SplunkTrust
SplunkTrust

To be fully honest, I didn't want to provide the lookup command because I rarely use it and don't want to mislead you with possibly bad syntax 🙂 But let's see...

Something like that:

Base Search
| stats earliest(_time) as "earliest" count by src_ip,query
| rename src_ip as "Host", query as "Query"
| mvexpand Query
| lookup Host Query OUTPUT latest AS lookuplatest

Would give you your base search processed and enriched with additional column lookuplatest containing the value of the latest column from the lookup. Now you can, for example, filter the results to include only those which have the latest time bigger than lookuplatest. Or you can chose to only list those that don't have lookuplatest set at all which would mean that they were not included in previous run results.

And bout the overwriting data part - as I said, it all depends on what exactly you want to achieve.

If you add the outputlookup to your ad-hoc search, with every run of your search you're modifying the "stored snapshot". Therefore you won't get consistent results across any subsequent runs of the search. You might be OK with it - I don't know your use case. I'm just pointing out that that's how it works.

Every problem defined as "since last run" behaves this way. It's often better to define a problem with strict (or at least relatively strictly defined) time boundaries. Like "during last 15 minutes" or "since last monday".

 

View solution in original post

BahadirS
Path Finder

Hello @SecDesh ,

If you have Enterprise Security there is a solution for this. It is called Search Driven Lookup. It automatically updates values for defined columns. 

0 Karma

SecDesh
Path Finder

I am new to Splunk so I apologize. I do have Enterprise Security, and looking through documentation, I believe I created an appropriate Search-Driven Lookup. However, I do not know how to utilize it properly for the Dashboard that I am creating. How could it be applied?

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Again - the question is what do you mean by comparison.

Because now you're getting some stats (earliest, latest,list of queried domains) calculated per source ip.

What do you mean by "new queries"? Do you want to list only domains that haven't been earlier queried by the same src_ip? Or do you want to list only queries that hadn't been done ever before by any IP?

Anyway, manipulating multivalued fields is not very convenient so I'd advise to do mvexpand Query to have separate row per each Query value.

Then if you have an intermediate results stored in a lookup you could try to do a lookup by Query and src_ip extracting any value into a new field then selecting only those rows from the results that don't have this lookup-generated field set.

But the question is do you want to store that result back into that lookup.

0 Karma

SecDesh
Path Finder

By new queries, I mean I want to list only domains that haven't been earlier queried by the same src_ip. And I want to see the count of the number of times it was queried. I include the _time stats so that it (hopefully) offers me an easy reference to reach back on when I compare whether or not it was contacted before.

For the mvexpand option you provided thank you. This helped.

For the final question you asked along with the information you provided, yes.

Once I've identified the new queries that have been reached by the individual src_ip, I would want to do an inputlookup append to the lookup file.  If everything is successful, I will modify the search query to do a tstats search instead for optimization purposes. The problem I'm currently having is trying to do a lookup and extracting the needed fields. For example, earlier I mentioned how I believed the "best" way to do it (the only way I've done it before) is to go by _time. So I currently have a query like this:

Base Search
| stats earliest(_time) as "earliest" count by src_ip,query
| rename src_ip as "Host", query as "Query"
| mvexpand Query 
| inputlookup append=t file.csv
| stats min(earliest) by Host,Query
| outputlookup file.csv
| eval isOutlier=if(earliest >= relative_time(now(), "-1d@d"), 1, 0)
| convert ctime(earliest)
| where isOutlier=1

Output:
No Results

The attempted goal of this query is to show me the outliers or "new" queries for each IP. However there's 2 -3 flaws here.
1. The line | stats min(earliest) by Host,Query is what's causing the No Results.
The "Outliers" are still showing me previously queried domains. Though definitely not as many.
3. _time as I've said, isn't needed for the final results. I really only need the src_ip, new query, and count of the src_query hitting the new query. However based on a previous dashboard I created and research, this is all I have to go off of when referencing fields from a lookup. I'm attempting to modify the search now.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Inputlookup doesn't work that way. Inputlookup is a command to list the contents of a lookup table and it must be the first command in the pipeline.

You need a normal lookup command to enrich your data.

But the problem, which is part technical problem, part "organizational" one is that if you want to only list results since last check and store them into the lookup for later use, you're modifying the lookup used as reference point. Depending on whether you use append=true or append=false in outputlookup, you might just add new results to the lookup file but in any case, each subsequent run of your search would produce different results! (there are also some issues with lookup caching, mind you - that's a bit out of scope of this question but it's good to know that there might be some problems there).

So maybe a good solution would be to create a periodic "snapshot" of your queries a a reference point (let's say - every half an hour) and make your search calculate only results comparing to that reference point?

0 Karma

SecDesh
Path Finder

How would that look?

And wouldn't that simply recycle the queries? If User A queries google at 10:00 AM, we check at 10:30 AM for a comparison for "new" queries where the query for google isn't displayed. But when we check once again at 11:00 AM, the query for google is displayed and presented as "new" because it was compared to the last 30 mins.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

To be fully honest, I didn't want to provide the lookup command because I rarely use it and don't want to mislead you with possibly bad syntax 🙂 But let's see...

Something like that:

Base Search
| stats earliest(_time) as "earliest" count by src_ip,query
| rename src_ip as "Host", query as "Query"
| mvexpand Query
| lookup Host Query OUTPUT latest AS lookuplatest

Would give you your base search processed and enriched with additional column lookuplatest containing the value of the latest column from the lookup. Now you can, for example, filter the results to include only those which have the latest time bigger than lookuplatest. Or you can chose to only list those that don't have lookuplatest set at all which would mean that they were not included in previous run results.

And bout the overwriting data part - as I said, it all depends on what exactly you want to achieve.

If you add the outputlookup to your ad-hoc search, with every run of your search you're modifying the "stored snapshot". Therefore you won't get consistent results across any subsequent runs of the search. You might be OK with it - I don't know your use case. I'm just pointing out that that's how it works.

Every problem defined as "since last run" behaves this way. It's often better to define a problem with strict (or at least relatively strictly defined) time boundaries. Like "during last 15 minutes" or "since last monday".

 

SecDesh
Path Finder

The syntax currently provided does give errors, stating the lookup could not be constructed based on given variables. I'll see what I can do, but I appreciate the wealth of knowledge and will accept this as the solution.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Of course. Forgot to give the lookup name in the lookup command.

It should have said

| lookup file.csv Host Query OUTPUT latest AS lookuplatest

Or however your lookup is called.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Firstly, no need to dedup if you're stats counting in the next step.

| stats earliest(_time) as FirstAppearance dc(query) as count by src_ip

But if you want the list of queries per src_ip you need

| stats values(query) by src_ip

Such multivalued form might be a bit inconvenient to use later thohgh but we'll see about that later.

Question is what do you mean by "compare"? What do you want to achieve by this comparison?

SecDesh
Path Finder

The final result I'm looking for is to compare DNS queries from the search query to the .csv, and display the results of only the new queries.

Right now, the issue I'm having is linking the times to to the individual query. The current result I have (modified thanks to you), is this:

index=nsm tag=dns query=* message_type=QUERY src_ip="10.20.30.*" NOT `MACRO`
| stats earliest(_time) as earliest latest(_time) as latest values(query) as Query by src_ip

This outputs the following results:

src_ip                      earliest                     latest                            Query
10.20.30.40         1645963490          1646325123            www<.>google<.>com
                                                                                                               www<.>youtube<.>com
10.20.30.50         1645963495          1646325130            www<.>google<.>com


I also don't know if running a comparison against the .csv with the results displayed like this will actually work, does each individual query need its' own field?

0 Karma
Get Updates on the Splunk Community!

Now Available: Cisco Talos Threat Intelligence Integrations for Splunk Security Cloud ...

At .conf24, we shared that we were in the process of integrating Cisco Talos threat intelligence into Splunk ...

Preparing your Splunk Environment for OpenSSL3

The Splunk platform will transition to OpenSSL version 3 in a future release. Actions are required to prepare ...

Easily Improve Agent Saturation with the Splunk Add-on for OpenTelemetry Collector

Agent Saturation What and Whys In application performance monitoring, saturation is defined as the total load ...