Splunk Search

How can I compare two multivalue fields from two different sets of events?

jamesplouffe
New Member

I have events (call them "approvedset" events) generated on a regular interval which each containing a field called listofIDs which is a string made up of a comma separated list of IDs of approved settings for each host in our environment.

I also have events for hosts which each have an ID field which lists one of the many active settings on the host. Each host can generate hundreds of these events on a regular basis.

Periodically I need to compare all of the ID fields in the host events for each host to the listofIDs in the latest "approvedset" event and return any results not in the latest "approvedset" "listofIDs".

As an example, assume the ID field values for two of the events for "test_host" are "32108" and "72058" and the latest "listofIDs" from the "approvedset" event is "32108,42589,78526". I'd like to run a search that compares the ID fields from the latest set of "host1" events to the latest "approvedset" "listofIDs" and returns ID 72058 because it is not in the "listofIDS" field in the latest "approvedset" event.

I've attempted to use NOT in combination with makemv and mvexpand, but I constantly run into memory errors (even after raising memory limits) because the "listofIDS" field could contain up to 80,000 IDs and I could be comparing more than 100,000 host event ID fields to the IDs in "listofIDS".

I've considered making the "approvedset" "listofIDS" a lookup, but because each host has a unique "approvedset" of "listofIDs", I would have to generate over 20,000 lookup files. This problem was previously solved using a MSSQL database and the T-SQL was (simplified a bit here):

--Function to break a CSV string into separate values
CREATE FUNCTION dbo.BreakStringIntoRows (@CommadelimitedString   varchar(1000))
RETURNS   @Result TABLE (Column1   VARCHAR(100))
AS
BEGIN
        DECLARE @IntLocation INT
        WHILE (CHARINDEX(',',    @CommadelimitedString, 0) > 0)
        BEGIN
              SET @IntLocation =   CHARINDEX(',',    @CommadelimitedString, 0)      
              INSERT INTO   @Result (Column1)
              --LTRIM and RTRIM to ensure blank spaces are   removed
              SELECT RTRIM(LTRIM(SUBSTRING(@CommadelimitedString,   0, @IntLocation)))   
              SET @CommadelimitedString = STUFF(@CommadelimitedString,   1, @IntLocation,   '') 
        END
        INSERT INTO   @Result (Column1)
        SELECT RTRIM(LTRIM(@CommadelimitedString))--LTRIM and RTRIM to ensure blank spaces are removed
        RETURN 
END
GO 

--Get list of ApprovedIDS
DECLARE @approvedlistofIDs NVARCHAR(50)
SET @approvedlistofIDS = (SELECT listofIDS FROM [testing].[dbo].[approvedset] WHERE hostname = 'host1')

--Get UnapprovedIDS from list of activeIDs compared to ApprovedIDs
SELECT [ID] AS UnapprovedIDs
FROM [testing].[dbo].[activeIDs] WHERE hostname = 'host1' and ID not in
(SELECT  * FROM dbo.BreakStringIntoRows( @approvedlistofIDS))

Additionally, as you can see, the T-SQL only generates results for one host. The code that calls the T-SQL handles the iteration through each host. I need to somehow perform that iteration in Splunk as well. To that end, I have a lookup table of hostnames which need to be checked which gets updated before each time this check needs to be run.

I realize that Splunk is not a SQL database and that the logic to do this, if it indeed can be done in Splunk, won't necessarily follow the SQL logic, but I am at a loss for where to go next so as not to continuously blow memory limits. I am, unfortunately, stuck in SQL logic and can't see my way to a clever solution for this in Splunk.

I've attempted bringing in each "approved" ID in its own event, but because of metadata overhead needed with each event, I end up with well over 2TB of data to be ingested at my Splunk forwarder every few days.

The impetus to move the data to Splunk was for dashboard flexibility that we didn't have with the DB based application. I am about ready to throw in the towel and just use DB Connect, but I figured I would check for any good solutions here first.

0 Karma

sundareshr
Legend

How about something like this...

index=regularevents NOT [search index=approvedIds | eval ID=split(listofIDs, ",") | mvexpand ID | table ID ] | table ID
0 Karma

MuS
Legend

Hmmm , 2TB of ingested data and a subsearch ..... you're going to hit the hard limit for sure. I have no time to play around this use case right now; but most likely you can do a stats and/or streamstats with some if() eval's ....

See the March virtual .conf session of @sideview to learn more on this topic http://wiki.splunk.com/Virtual_.conf

cheers, MuS

0 Karma
Get Updates on the Splunk Community!

Now Available: Cisco Talos Threat Intelligence Integrations for Splunk Security Cloud ...

At .conf24, we shared that we were in the process of integrating Cisco Talos threat intelligence into Splunk ...

Preparing your Splunk Environment for OpenSSL3

The Splunk platform will transition to OpenSSL version 3 in a future release. Actions are required to prepare ...

Easily Improve Agent Saturation with the Splunk Add-on for OpenTelemetry Collector

Agent Saturation What and Whys In application performance monitoring, saturation is defined as the total load ...