Hello guys, I have below query which uses join. I see lots of examples how to replace that with stats, but I am not able to. I need to join on _time and another field called snat. Output should at least show client_ip Account_Name.
Thanks
index=_ad (EventCode=4625 OR (EventCode=4771 Failure_Code=0x18)) Account_Name=JohnDoe Source_Network_Address IN (10.10.10.10 20.20.20.20)
| bucket span=1m _time
| eval Source_Network_Address1 = case(EventCode==4771, trim(Client_Address, "::ffff:"))
| eval SourceIP = Source_Network_Address
| eval Account_Name4625= case(EventCode=4625,mvindex(Account_Name,1))
| eval Account_Name4771= case(EventCode=4771,Account_Name)
| eval Account_Name = coalesce(Account_Name4771, Account_Name4625)
| eval Source_Network_Address_Port = SourceIP+":"+Source_Port
| rex field=ComputerName "(?<DCName>^([^.]+))"
| rename Source_Network_Address_Port as snat
| stats count by _time snat Account_Name EventCode DCName
| join type=inner _time snat
[search index=_network snat IN (10.10.10.10*,20.20.20.20*)
| bucket span=1m _time
| rex field=client "^(?<client_ip>.*?)\:(?<client_port>.*)"
| stats count by _time snat client_ip]
You can do this with an eventstats. The exact method can depend on data characteristics and desired output. The following assumes that index _add search returns fewer results than index _network search, that every snat has at least one matching client_ip, and that you want to tabulate all combinations with client_ip.
(index=_ad (EventCode=4625 OR (EventCode=4771 Failure_Code=0x18)) Account_Name=JohnDoe Source_Network_Address IN (10.10.10.10 20.20.20.20))
OR (index=_network snat IN (10.10.10.10*,20.20.20.20*)) ``` get relevant data ```
| bucket span=1m _time ``` common time buckets ```
| eval Source_Network_Address1 = case(EventCode==4771, trim(Client_Address, "::ffff:"))
| eval SourceIP = Source_Network_Address
| eval Account_Name4625= case(EventCode=4625,mvindex(Account_Name,1))
| eval Account_Name4771= case(EventCode=4771,Account_Name)
| eval Account_Name = coalesce(Account_Name4771, Account_Name4625)
| eval Source_Network_Address_Port = SourceIP+":"+Source_Port
| rex field=ComputerName "(?<DCName>^([^.]+))"
| rename Source_Network_Address_Port as snat
``` the above applies to index _ad ```
| rex field=client "^(?<client_ip>.*?)\:(?<client_port>.*)" ``` this applies to index _network ```
| eventstats values(client_ip) as client_ip by _time snat ``` assuming index _ad search returns fewer events ```
| stats count by _time snat Account_Name EventCode DCName client_ip
If client_ip could be missing for some snat and you can accept multi value client_ip, change the last stats to
| stats count values(client_ip) as client_ip by _time snat Account_Name EventCode DCName
If event counts are opposite, use eventstats on the other dataset.
Hope this helps.
client_ip is not getting returned. I have tried using values() and count by.
I tested rex for client_ip returns values using below test, and it does.
index=_network snat IN (10.10.10.10*,20.20.20.20*)
| rex field=client "^(?<client_ip>.*?)\:(?<client_port>.*)" ``` this applies to index _network ```
| table client_ip
Above returns just a list of IPs from our clients.
Do you mean client_ip is no longer returned just because index _ad is included in OR phrase? Does the following return client?
(index=_ad (EventCode=4625 OR (EventCode=4771 Failure_Code=0x18)) Account_Name=JohnDoe Source_Network_Address IN (10.10.10.10 20.20.20.20))
OR (index=_network snat IN (10.10.10.10*,20.20.20.20*)) ``` get relevant data ```
| rex field=client "^(?<client_ip>.*?)\:(?<client_port>.*)" ``` this applies to index _network ```
| dedup client client_ip
| table client client_ip
That returns client and client_ip just fine
Seems like splunk cares which index is mentioned furst?
It was my mistake. I copied | rename Source_Network_Address_Port as snat from your original search for index _ad. snat requires coalesce as well.
(index=_ad (EventCode=4625 OR (EventCode=4771 Failure_Code=0x18)) Account_Name=JohnDoe Source_Network_Address IN (10.10.10.10 20.20.20.20))
OR (index=_network snat IN (10.10.10.10*,20.20.20.20*)) ``` get relevant data ```
| bucket span=1m _time ``` common time buckets ```
| eval Source_Network_Address1 = case(EventCode==4771, trim(Client_Address, "::ffff:"))
| eval SourceIP = Source_Network_Address
| eval Account_Name4625= case(EventCode=4625,mvindex(Account_Name,1))
| eval Account_Name4771= case(EventCode=4771,Account_Name)
| eval Account_Name = coalesce(Account_Name4771, Account_Name4625)
| eval Source_Network_Address_Port = SourceIP+":"+Source_Port
| eval snat = coalesce(snat, Source_Network_Address_Port)
| rex field=ComputerName "(?<DCName>^([^.]+))"
``` the above applies to index _ad ```
| rex field=client "^(?<client_ip>.*?)\:(?<client_port>.*)" ``` this applies to index _network ```
| eventstats values(client_ip) as client_ip by _time snat ``` assuming index _ad search returns fewer events ```
| stats count by _time snat Account_Name EventCode DCName client_ip
So that worked! However, it created a new problem. It runs sooo long.
I was talking another engineer and he suggested to do outputlookup just for the small _ad index.
than use that data to match just like in this example by @ITWhisperer
But i m not able to combine and get the output. I'm thinking because i need to match on 2 columns instead of single one as below?
search index | table host | dedup host | append [ | inputlookup lookupfile | table host | dedup host ] | stats count by host | eval match=if(count=1, "missing", "ok")
Yes, eventstats can be expensive and possibly slower than join. You can try switch the eventstats for fields in index _network instead.
I don't understand why all in a sudden you are handling host instead of snat. I will assume that you still want to match snat but with host in lookupfile representing the host portion like 10.10.10.10. Is this correct? You also lost matching _time (bucketed to 5m). If all you want to match is the host portion of snat, i.e., if the lookup contains no _time info, you can have this very efficient search
index=_network
[ inputlookup lookupfile ``` assuming this only contains 10.10.10.10, 20.20.20.20 ```
| dedup host ``` you SHOULD produce a lookup file that contains no dup ```
| rename host AS snat
| eval snat = snat . ":*" ``` the lookup file SHOULD use field name snat and contains asterisk without external work ```]
| bucket span=1m _time
| rex field=client "^(?<client_ip>.*?)\:(?<client_port>.*)"
| stats count by _time snat client_ip
However, if you still want to match _time and put bucketed _time column in lookupfile, the search will necessarily be more complex. You will have to explain data (both in index and in lookupfile) and exactly what desired output looks like, and describe the exact logic connecting the dataset and desired output.
Matching on both fields is paramount here
Including buckets of time of 1minute
Also,
Network query count is 10million events (give or take )in 24 hour window. So imagine running this for 10 days
To make sure that you have defined your use case and tech path clearly, let me highlight several factors that are not super clear to me. (Trust me, explaining to someone with less intimate knowledge will help you find the right path. I was in your position.)
Now, assuming that you still want to pursue the lookup path, here is my reference implementation of the table before I propose what I see as an efficient matching search.
index=_ad (EventCode=4625 OR (EventCode=4771 Failure_Code=0x18))
| bucket span=1m _time
| eval Account_Name4625= case(EventCode=4625,mvindex(Account_Name,1))
| eval Account_Name4771= case(EventCode=4771,Account_Name)
| eval Account_Name = coalesce(Account_Name4771, Account_Name4625)
| eval snat = Source_Network_Address +":"+Source_Port
| eval DCName=mvindex(split(ComputerName, "."), 0)
| stats count by _time snat Account_Name EventCode DCName
| outputlookup index_ad_lookup
This is effectively your original outer search with restrictions on snat and Account_Name removed. Note my reference table name is index_ad_lookup.
If you can keep the lookup fresh enough to suite your needs, this is how to use it to match index _network and add Account_Name, etc.
index=_network ```snat IN ($snat_tok$)```
| bucket span=1m _time
| eval client_ip = mvindex(split(client, ":"), 0)
| stats count by _time snat client_ip
| lookup index_ad_lookup snat _time ``` add Account_Name, EventCode, DCName where a match exists ```
The comment is a speculation of how you may eliminate events with input token. Because your original join does not require the events to have a match with index _ad, I seriously doubt if this will have better performance. (In fact, I had already written a search that requires events to have a match in order to be counted before I realized what your original join was doing. That would have improve performance if matching sets are small.)
I was also making an alternative search based on possible event reduction by requiring match between _network and _ad before I realized the mathematical difference. If your requirement is to count all _network events, just add Account_Name, etc. where a match exists, any alternative will probably perform similar to the join command. Like this one:
index=_network snat IN (10.10.10.10*,20.20.20.20*)
| bucket span=1m _time
| eval client_ip = mvindex(split(client, ":"), 0)
| stats count by _time snat client_ip
| append
[search index=_ad (EventCode=4625 OR (EventCode=4771 Failure_Code=0x18)) Account_Name=JohnDoe Source_Network_Address IN (10.10.10.10 20.20.20.20)
| bucket span=1m _time
| eval Source_Network_Address1 = case(EventCode==4771, trim(Client_Address, "::ffff:")) ``` this field is not used ```
| eval Account_Name4625= case(EventCode=4625,mvindex(Account_Name,1))
| eval Account_Name4771= case(EventCode=4771,Account_Name)
| eval Account_Name = coalesce(Account_Name4771, Account_Name4625)
| eval snat = Source_Network_Address+":"+Source_Port
| eval DCName=mvindex(split(ComputerName, "."), 0)
| stats count as count_ad by _time snat Account_Name EventCode DCName]
| stats values(Account_Name) as Account_Name values(EventCode) as EventCode values(DCName) as DCName by _time snat client_ip count
In short, you need to clarify whether you want to count all events from index _network or only count events that find a match in index _ad, or maybe every event is a match in which case there is no difference. Because the main performance inhibitor is the number of events in _network, there is little to be gained if the requirement is not to restrict events.
Yuanliu thank you very much for taking your time to write and help.
Below is my response
1. lets forget about outputlookup
2. original main (outer) search of network index is fast by itself given the code that includes bucket, eval and output in stats. So the delay is not here.
3. dashboard input is account_Name and source_network_address (but i can use tokens to pass network_address)
4. since we are no longer doing lookup table it's a moot point to answer subsequent bullet items. I'm Sorry.
Someone pointed out that its possible to reduce network output by passing parameter to the outer main search using "format" While that code drastically reduces returned set of data from network, query I now need to rethink how to move forward from produced output. I think it a step in a right direction. Let me show what was proposed:
Let me explain data structure a bit:
snat and client is in a form IP:PORT
So we need to match IP:PORT from network and ad indexes as well as time stamp that those events match on.
output is just client_ip.
i don't need to make any evaluations of count i just want to know how many client_IP we have of each
New proposed code is below. If you look at splunk logging its interesting to see how format appends to network search in form of (snat=ip1 OR snat=ip2 OR ....)
index=_network sourcetype=f5 irule=*
[| search index=_ad (EventCode=4625 OR (EventCode=4771 Failure_Code=0x18)) Account_Name=USER Source_Network_Address IN (IP1,IP2..)
| eval Source_Network_Address1 = case(EventCode==4771, trim(Client_Address, "::ffff:"))
| eval SourceIP = Source_Network_Address
| eval snat = SourceIP+":"+Source_Port
| fields snat | format ] ```passes formated string to main search for _network to find only those snats```
| table client snat _time
```| join type=inner snat _time
[ search index=_ad (EventCode=4625 OR (EventCode=4771 Failure_Code=0x18)) Account_Name=USER Source_Network_Address IN (IP1,IP2..)
| eval Source_Network_Address1 = case(EventCode==4771, trim(Client_Address, "::ffff:"))
| eval SourceIP = Source_Network_Address
| eval client = SourceIP+":"+Source_Port
| table snat_time]
| rex field=client "^(?<client_ip>.*?)\:(?<client_port>.*)"
| stats values(client_ip)```
Using index _ad in subsearch to limit _network output will definitely improve performance; in fact, that's exactly in the suggestion that I scrapped because it was mathematically different from your original search. Hence Question #2: "Your original search has a common field name count in both outer search and subsearch... My (previous) search gives the count of matching events ONLY. Which count is needed?" If you are only counting matching events, your new search should work. Does it perform well enough? Or is there still mathematical problems? As a general rule, if a search meets the need of the current use case, defer any optimization.
By the way, you do not need | format (with no option). Splunk optimization will drop it silently, anyway. The most common use of format is to help user verify whether a subsearch will produce the desired search strings. (Another use is to fine tune subsearch output; but this cannot be achieve with no option.)
Hello,
The only count I need in my report is client_ip counts
I dont understand how you can count matching events but if you can modify my last query (and replace join with something more efficient) that would be a great help.
Thanks