All Apps and Add-ons

What command type is accum? Is there a way to compute stats/counts on indexers?

Communicator

Splunk's command types page is missing a few functions, including accum. I would like to know if accum is a centralized streaming command, distributable streaming command, or none of the above. Essentially, I need to know if it can run on indexers, so that I don't have to bring the whole event set back to the search head before computing totals. For my use case, this is crucial for developing a scalable solution.

One comment at the bottom of the accum page suggests that it functions similarly to streamstats . . . which might imply that it is a centralized streaming command. If so, that would be disappointing: what I'm really looking for is a way to offload the computation of some counts/totals to a group of distributed indexers. I am using the Machine Learning Toolkit and working with some large DNS datasets, and it appears that every command type except for distributable streaming commands cannot run on indexers.

I am worried that Splunk may lack an important cluster computing capability: being able to compute intermediary statistics on separate nodes (e.g. map-reduce). It sounds like stats and other transforming commands really only run on the search head . . . meaning the entire event set has to be pulled from the distributed indexers into a single node before computing counts, totals, averages, etc.

From a cluster computing perspective, this lack of parallelism is concerning . . . and if there is no way to compute subtotals on indexers or otherwise parallelize the summarization process before data reaches the search head, it would seem that a custom big data platform is still the only answer for high-volume machine learning tasks like this one. I hope I'm wrong about this, or that I missed something in the documentation. Please let me know if you have any other information.

1 Solution

SplunkTrust
SplunkTrust

accum field as newfield is the same as streamstats sum(field) as newfield, with all the performance and distributed-ness implications.
streamstats is "centralized streaming". What does that mean?

A streaming command operates on each event as it is returned by a search. Essentially one event in and one (or no) event out.
http://docs.splunk.com/Documentation/Splunk/6.5.1/Search/Typesofcommands

That says something about how the command processes events, essentially "put five events in, get five slightly altered events out in a streaming and also usefully previewable way". That doesn't distinguish between where the command does most of its work though... that docs page has an attempt to explain things, here's my approach:
The key question is "can the command work on event A without event B?" If that's the case, then the command can run on indexers because one indexer doesn't need to know the data from the other indexers to do its job - a simple example is eval, you can do maths on the fields of an event without knowing other events.
With streamstats, the output for an event depend on all the events before it. One indexer doesn't know the sum(field) for any of its events without knowing what the other indexers know, all events have to be put in one result set first... that can only happen after firehosing the searchhead with data from all indexers.

An important observation, this is orthogonal to "streaming" or "non-streaming". Take stats count, a non-streaming / transforming command ("many events in, few summary events out"). The definition on the docs page can be confusing:

A non-streaming command requires the events from all of the indexers before the command can operate on the entire set of events.
That's true, but doesn't tell the whole story. For a stats count, the indexers can do the bulk of the work. This is where map-reduce is most visible: Each indexer runs the internal command prestats to compute its own count (map), and the search head sums up the counts it gets from the indexers (reduce).

To summarize, while accum is not map-reduceable, your textual description "what I'm really looking for is a way to offload the computation of some counts/totals to a group of distributed indexers" can be map-reduceable depending on the specific operation. Your conclusion "meaning the entire event set has to be pulled from the distributed indexers into a single node before computing counts, totals, averages, etc." is mostly false, again depending on the specific operation.

View solution in original post

SplunkTrust
SplunkTrust

accum is “Stateful Streaming” command.

  • Process search results one-by-one

  • Can maintain global state

  • Must not re-order search results

  • Only run at Search Head

Examples:

  • accum

  • streamstats

  • dedup

____________________________________________
| makeresults | eval message= "Happy Splunking!!!"
0 Karma

SplunkTrust
SplunkTrust

accum field as newfield is the same as streamstats sum(field) as newfield, with all the performance and distributed-ness implications.
streamstats is "centralized streaming". What does that mean?

A streaming command operates on each event as it is returned by a search. Essentially one event in and one (or no) event out.
http://docs.splunk.com/Documentation/Splunk/6.5.1/Search/Typesofcommands

That says something about how the command processes events, essentially "put five events in, get five slightly altered events out in a streaming and also usefully previewable way". That doesn't distinguish between where the command does most of its work though... that docs page has an attempt to explain things, here's my approach:
The key question is "can the command work on event A without event B?" If that's the case, then the command can run on indexers because one indexer doesn't need to know the data from the other indexers to do its job - a simple example is eval, you can do maths on the fields of an event without knowing other events.
With streamstats, the output for an event depend on all the events before it. One indexer doesn't know the sum(field) for any of its events without knowing what the other indexers know, all events have to be put in one result set first... that can only happen after firehosing the searchhead with data from all indexers.

An important observation, this is orthogonal to "streaming" or "non-streaming". Take stats count, a non-streaming / transforming command ("many events in, few summary events out"). The definition on the docs page can be confusing:

A non-streaming command requires the events from all of the indexers before the command can operate on the entire set of events.
That's true, but doesn't tell the whole story. For a stats count, the indexers can do the bulk of the work. This is where map-reduce is most visible: Each indexer runs the internal command prestats to compute its own count (map), and the search head sums up the counts it gets from the indexers (reduce).

To summarize, while accum is not map-reduceable, your textual description "what I'm really looking for is a way to offload the computation of some counts/totals to a group of distributed indexers" can be map-reduceable depending on the specific operation. Your conclusion "meaning the entire event set has to be pulled from the distributed indexers into a single node before computing counts, totals, averages, etc." is mostly false, again depending on the specific operation.

View solution in original post

SplunkTrust
SplunkTrust

Exactly, eventstats | stats forces the stats to run entirely on the search head, rather than prestats on indexers and stats on the search head.

I'll go out on a limb and claim all (probably only most) cases of eventstats | stats can be rephrased to have the stats first. Let's take your example:

eventstats dc(client_ip) as total_clients count as total_events 

The count is simple, you do stats count as events_per_domain by domain and later eventstats sum(events_per_domain) as total_events.
The distinct count is a bit more tricky, but you can use "traditional" map-reduce approaches here: Instead of directly asking for a distinct count, you ask for a preliminary result by domain - the list of unique values - and later turn that into a distinct count like so (untested syntax/details, the principle should work): stats values(client_ip) as clients_per_domain by domain | eventstats dc(clients_per_domain) as total_clients
(side note - if dc doesn't work as intended on multi-value fields, this should do: stats ... | eventstats values(clients_per_domain) as total_clients | eval total_clients = mvcount(total_clients))
The various thing() by domain need to be combined into one stats call of course.

This approach is faster for two reasons. First, the old approach went over all data twice - eventstats for all events, stats for all events - while the new approach throws a much smaller result set at the eventstats... that's regardless of distributed computing or standalone boxes. Second, the map-reduce-ability of stats isn't being wasted so you benefit from distributed capacity and send less data over the wire.

Oh yeah, new question/thread - in principle you should branch off here and post a new question "I have this badly distributable search, how can I make it better?", but then the details of how and why are overlapping with this question. I'd draw the line at more lengthy/concrete searches, so far it's basically examples to illustrate the distribution of work and therefore lines up nicely with the original question.

Communicator

Thanks Martin! We'll see if the dc ends up being tricky at all, but knowing what I do now about how stats works on the back end, this approach makes much more sense than what I was doing before.

0 Karma

SplunkTrust
SplunkTrust

\o/

0 Karma

Communicator

Great stuff! So, is it correct to assume that if an eventstats command preceded a stats invocation, then the stats calculations would be forced to occur on the search head (even though it would otherwise have been map-reduce-able)?

If this is correct, then I have another problem to think about -- restructuring a certain query to avoid eventstats so that I can leverage as much distribute-ability as possible. Perhaps it would be most appropriate to start a new thread for that; if so, just let me know. But since it is connected to this discussion, here's the concept:

What I'm after is a bunch of statistics "by domain" (as shown above), and some of those stats need to be normalized by two other things: total events, and total clients.

The way I'm doing that now is a bit wonky, starting with an eventstats command right before the primary stats call:

| eventstats dc(clientip) as totalclients count as total_events

Following this is the primary stats call, and I manage to keep totalclients and totalevents in scope by slipping this in with the stats invocation:

values(totalclients) as totalclients values(totalevents) as totalevents

Since eventstats gave each event the same exact totalclients and totalevents fields, totalclients and totalevents become simple, single-valued fields for each domain (recall that the main stats call is "by domain"). At this point, the normalization calculations can be performed with simple eval's.

So, the question is: is there another, more cluster-computing-friendly way to normalize the results from that by-domain stats call without having to use eventstats to pre-calculate the totalclients and totalevents? And also without having to split the entire query into multiple separate queries?

0 Karma

SplunkTrust
SplunkTrust

Here's an example where a switch changes the map-reduce-ability of a command: dedup
http://docs.splunk.com/Documentation/Splunk/6.5.1/SearchReference/Dedup

By default, | dedup field will give you the latest event for each field value. Indexers can individually dedup, return one event per value (plus metadata) to the search head, and the search head can then determine for each field value which indexer's pre-dedup'd event "wins".
If you set consecutive=t, all that goes out the window. That switch tells Splunk to only throw out duplicate events if there are no different field values between two duplicate values. That's a decision no indexer can make or even pre-compute - for any pair of duplicate values on one indexer, another indexer might have an event between the two.

In other words, setting this switch runs a different algorithm that has to look at the complete picture.

Keep in mind as well, any piped commands after the first reduce (dedup here, stats, etc.) or centralized streaming command naturally continue on the search head only. Nothing for the indexers to do because they don't know the reduced result.

SplunkTrust
SplunkTrust

The most complete list is here: http://docs.splunk.com/Documentation/Splunk/6.5.1/SearchReference/Commandsbytype
It's not exactly listing them by what runs where though. In short, red flags in there are "centralized streaming".

What you can do instead is run a search in Splunk and look at the job inspector's remoteSearch and reportSearch fields. In essence, remoteSearch is map and reportSearch is reduce. To get those you don't even need useful data in your indexes, all you're looking for is how Splunk would execute the search. I've done that for your stats example:

my search: dummy search | stats avg(ttl) as avg_ttl dc(record_ip) as a_record_count count(domain) as resolution_attempts values(clients) as client_values by domain 
remoteSearch: litsearch ( dummy search ) | addinfo type=count label=prereport_events | fields keepcolorder=t "clients" "domain" "prestats_reserved_*" "psrsvd_*" "record_ip" "ttl" | prestats count(domain) distinct_count(record_ip) mean(ttl) values(clients) by domain 
reportSearch: stats avg(ttl) as avg_ttl dc(record_ip) as a_record_count count(domain) as resolution_attempts values(clients) as client_values by domain 

As you can see, each indexer is computing pre-stats values. For average, it is computing a sum and a count - then the search head can sum up all sums and counts, and compute the overall average. For distinct count it's a bit more tricky, essentially each indexer has to return a dedup'd list of values to let the search head combine the lists, dedup again, and count - still most of the work down "down there" in map. Count is dead simple. For values, each indexer again produces a dedup'd list of values for the search head to combine into one list of dedup'd values. The split by field just tells everyone to do that for each field value, same algorithm.

Coming back to distinct count for a second, if you have really large cardinality then computing a dc distributedly can be expensive - the list to return to the search head might become huge. For that reason, stats not only has dc() but also estdc(): http://docs.splunk.com/Documentation/Splunk/6.5.1/SearchReference/CommonStatsFunctions#Aggregate_fun... https://answers.splunk.com/answers/98220/whats-the-difference-between-dc-distinct-count-and-estdc-es...

Are all stats functions map-reduce-able? Yes, but some are faster than others - count vs distinct count from above, for example. That's again not a splunk thing, but an algorithm thing... you'd have the same issue if you wrote map-reduce for hadoop or similar things.

As for telling what happens when and where, you're back in the job inspector for "seeing what the machine is doing", or back at thinking about the algorithms involved and figuring out what can run where independent of other indexers or not.

Communicator

Thanks for the information, Martin! Several follow-up questions:

First off, is there a full list of commands that can properly map-reduce using indexers? For instance, which modes of stats are compatible? Does it still work if there is a by clause? And am I correct to assume that any eventstats call will always need every event shot back to the search head? Also, is there a way to tell when all events are being dragged back to the search head or not (other than changes in execution time)?

For reference, here is a simplified version of the primary stats call I've been using:

| stats avg(ttl) as avgttl dc(recordip) as arecordcount count(domain) as resolutionattempts values(clients) as clientvalues by domain

With the use of a by clause along with dc, values, avg, and count, is this something that can run on the indexers and then bring back a reduced set of summary statistics to the search head?

Apologies for the barrage of questions, but I really do need answers to them, and I've been having trouble finding good documentation.

0 Karma

SplunkTrust
SplunkTrust

Another one of the hints you can look for in Splunk's documentation for when something is map-reducible is the phrase "sufficient statistics". Like Martin said, avg() is easily map-reduced by subtotaling count and sum, so a count and a sum are the sufficient statistics for average. The sufficient statistics for a dc() is the set of distinct values. The sufficient statistics for a standard deviation include a sum and a sum of squares.

You'll also see similar sufficient statistics talk used in summary indexing (as the problem is very similar), and in the "prestats=true" output of the tstats command. There, you'll see fields beginning with named like prsrvd_* which are preserved sufficient statistics values for use in further aggregation down the pipe...