Splunk Search

Can I speed up this slow performance in summary creation?

alvesri
Engager

Hello guys, 

Can you help us with this case, thank you in advance.

We received 300k events in 24 hours,
we have to process on peak, about 15k in real-time,
and this job takes 140 sec to process,
is it possible to make it take less time ?

The application it's already developed, the output should stay the same. 

Savedsearches.conf:

 

 

 

[Preatreament - Opération Summary]
action.email.show_password = 1
action.logevent = 1
action.logevent.param.event = _time=$result._time$|ABC123456Emetrice=$result.ABC123456Emetrice$|ABC123456Receptrice=$result.ABC123456Receptrice$|ABCaeiou=$result.ABCaeiou$|ABCdonneurbbbb=$result.ABCdonneurbbbb$|AAAAaeiou=$result.AAAAaeiou$|AAAADonneurbbbb=$result.AAAADonneurbbbb$|application=$result.application$|canal=$result.canal$|codeE=$result.codeE$|count=$result.count$|csv=$result.csv$|dateEmissionrrrr=$result.dateEmissionrrrr$|dateReglement=$result.dateReglement$|date_hour=$result.date_hour$|date_mday=$result.date_mday$|date_minute=$result.date_minute$|date_month=$result.date_month$|date_second=$result.date_second$|date_wday=$result.date_wday$|date_year=$result.date_year$|date_zone=$result.date_zone$|deviseOrigine=$result.deviseOrigine$|deviseReglement=$result.deviseReglement$|encryptedAAAAaeiou=$result.encryptedAAAAaeiou$|encryptedAAAADonneurbbbb=$result.encryptedAAAADonneurbbbb$|etat=$result.etat$|eventtype=$result.eventtype$|heureEmissionrrrr=$result.heureEmissionrrrr$|host=$result.host$|identifiantrrrr=$result.identifiantrrrr$|index=$result.index$|info_max_time=$result.info_max_time$|info_min_time=$result.info_min_time$|info_search_time=$result.info_search_time$|lastUpdate=$result.lastUpdate$|libelleRejet=$result.libelleRejet$|linecount=$result.linecount$|montantOrigine=$result.montantOrigine$|montantTransfere=$result.montantTransfere$|motifRejet=$result.motifRejet$|nomaeiou=$result.nomaeiou$|nomDonneurbbbb=$result.nomDonneurbbbb$|orig_index=$result.orig_index$|orig_sourcetype=$result.orig_sourcetype$|phase=$result.phase$|punct=$result.punct$|refEstampillage=$result.refEstampillage$|refFichier=$result.refFichier$|refbbbbClient=$result.refbbbbClient$|refTransaction=$result.refTransaction$|search_name=$result.search_name$|search_now=$result.search_now$|sens=$result.sens$|source=$result.source$|sourcetype=$result.sourcetype$|splunk_server=$result.splunk_server$|splunk_server_group=$result.splunk_server_group$|startDate=$result.startDate$|summaryDate=$result.summaryDate$|timeendpos=$result.timeendpos$|timestamp=$result.timestamp$|timestartpos=$result.timestartpos$|typeOperation=$result.typeOperation$|summaryDate_ms=$result.summaryDate_ms$|UUUUUETR=$result.UUUUUETR$|messageDefinitionIdentifier=$result.messageDefinitionIdentifier$|ssssssInstructionId=$result.ssssssInstructionId$|endToEndIdentification=$result.endToEndIdentification$|
action.logevent.param.index = bam_xpto_summary
action.logevent.param.sourcetype = Opération_summary
action.lookup = 0
action.lookup.append = 1
action.lookup.filename = test.csv
alert.digest_mode = 0
alert.severity = 1
alert.suppress = 0
alert.track = 0
counttype = number of events
cron_schedule = */1 * * * *
dispatch.earliest_time = -6h
dispatch.latest_time = now
enableSched = 1
quantity = 0
relation = greater than
search = (index="bam_xpto" AND sourcetype="Opération") OR (index="bam_xpto_summary" sourcetype="Opération_summary" earliest=-15d latest=now)\
| search  [ search index="bam_xpto" AND sourcetype="Opération" \
            | streamstats count as id \
            | eval splitter=round(id/500) \
            | stats values(refEstampillage) as refEstampillage by splitter\
            | fields refEstampillage]\
| sort 0 - _time indexTime str(sens)\
| fillnull application phase etat canal motifRejet libelleRejet identifiantrrrr dateReglement ABCdonneurbbbb nomDonneurbbbb ABCaeiou nomaeiou codeEtablissement refFichier messageDefinitionIdentifier UUUUUETR  ssssssInstructionId endToEndIdentification value=" " \
| eval codeEtablissement=if(codeEtablissement=="", "N/R",codeEtablissement),\
     identifiantrrrr=if(identifiantrrrr=="", "N/R",identifiantrrrr),\
     dateReglement=if(dateReglement=="", "N/R",dateReglement),\
     ABCdonneurbbbb=if(ABCdonneurbbbb=="", "N/R",ABCdonneurbbbb), \
     nomDonneurbbbb=if(nomDonneurbbbb=="", "N/R",nomDonneurbbbb),\
     ABCaeiou=if(ABCaeiou=="", "N/R",ABCaeiou),\
     nomaeiou=if(nomaeiou=="", "N/R",nomaeiou),\
     libelleRejet=if(libelleRejet=="", "N/R",libelleRejet),\
     refFichier=if(refFichier=="", "N/R",refFichier),\
     application=if(application=="", "N/R",application),\
     canal=if(canal=="", "N/R",canal),\
     motifRejet=if(motifRejet=="", "N/R",motifRejet),\
     count=if(sourcetype="Opération", 1, count),\
     startDate=if(isnull(startDate), _time, startDate),\
     typeOperation = if(NOT (messageDefinitionIdentifier==" " AND endToEndIdentification== " " AND ssssssInstructionId == " " AND UUUUUETR== " ") , messageDefinitionIdentifier, typeOperation), \
     refTransaction = if(NOT (messageDefinitionIdentifier==" " AND endToEndIdentification== " " AND ssssssInstructionId == " " AND UUUUUETR== " ") , ssssssInstructionId, refTransaction),\
     relatedRef = if(NOT (messageDefinitionIdentifier==" " AND endToEndIdentification== " " AND ssssssInstructionId == " " AND UUUUUETR== " ") , endToEndIdentification, relatedRef)\
| foreach * \
    [eval <<FIELD>>=replace(<<FIELD>>, "\"","'"), <<FIELD>>=replace(<<FIELD>>, "\\\\"," "), <<FIELD>>=replace(<<FIELD>>, ",",".")]\
| eval nomDonneurbbbb=replace(nomDonneurbbbb,"[^\p{L}\s]",""), nomaeiou=replace(nomaeiou,"[^\p{L}\s]","") \
| eval nomDonneurbbbb=replace(nomDonneurbbbb,"\s{2,99}"," "), nomaeiou=replace(nomaeiou,"\s{2,99}"," ") \
| stats latest(_time) as _time, latest(Actions_xpto) as Actions_xpto, list(sens) as sens, list(phase) as phase, list(etat) as etat, list(identifiantrrrr) as identifiantrrrr, list(dateReglement) as dateReglement, list(ABCdonneurbbbb) as ABCdonneurbbbb, list(nomDonneurbbbb) as nomDonneurbbbb, list(ABCaeiou) as ABCaeiou, list(nomaeiou) as nomaeiou, list(codeEtablissement) as codeEtablissement, list(index) as index, list(count) as count, list(typeOperation) as typeOperation, list(libelleRejet) as libelleRejet , list(application) as application,latest(dateEmissionrrrr) as dateEmissionrrrr, list(canal) as canal, earliest(deviseOrigine) as deviseOrigine, earliest(deviseReglement) as deviseReglement, earliest(refbbbbClient) as refbbbbClient, list(refFichier) as refFichier, earliest(montantOrigine) as montantOrigine, earliest(montantTransfere) as montantTransfere, last(AAAADonneurbbbb) as AAAADonneurbbbb, last(AAAAaeiou) as AAAAaeiou, list(motifRejet) as motifRejet, list(refTransaction) as refTransaction, earliest(encryptedAAAAaeiou) as encryptedAAAAaeiou, earliest(encryptedAAAADonneurbbbb) as encryptedAAAADonneurbbbb, first(heureEmissionrrrr) as heureEmissionrrrr, first(sourcetype) as sourcetype, last(ABC123456Receptrice) as ABC123456Receptrice, last(ABC123456Emetrice) as ABC123456Emetrice,latest(summaryDate) as summaryDate, list(startDate) as startDate, list(endToEndIdentification) as endToEndIdentification, list(messageDefinitionIdentifier) as messageDefinitionIdentifier, list(UUUUUETR) as UUUUUETR, list(ssssssInstructionId) as ssssssInstructionId, count(eval(sourcetype="Opération")) as nbOperation, min(startDate) as minStartDate  by refEstampillage\
|  eval lastSummaryIndex=mvfind(index, "bam_xpto_summary"), lastSummaryIndex=if(isnull(lastSummaryIndex), -1, lastSummaryIndex)\
| foreach * \
    [eval <<FIELD>>=mvindex(<<FIELD>>,0, lastSummaryIndex)]\
| eval etat=mvjoin(etat,","), phase=mvjoin(phase,","), identifiantrrrr=mvjoin(identifiantrrrr,","), dateReglement=mvjoin(dateReglement,","), ABCdonneurbbbb=mvjoin(ABCdonneurbbbb,","), nomDonneurbbbb=mvjoin(nomDonneurbbbb,","), ABCaeiou=mvjoin(ABCaeiou,","), nomaeiou=mvjoin(nomaeiou,","), codeEtablissement=mvjoin(codeEtablissement,","),application=mvjoin(application,","),canal=mvjoin(canal,","),motifRejet=mvjoin(motifRejet,","),libelleRejet =mvjoin(libelleRejet ,","),dateReglement=mvjoin(dateReglement,","),refFichier=mvjoin(refFichier,","), sens=mvjoin(sens,","), startDate=mvjoin(startDate,","), count=mvjoin(count,","), oldSummary=summaryDate, endToEndIdentification = mvjoin (endToEndIdentification, ","), messageDefinitionIdentifier = mvjoin (messageDefinitionIdentifier, ","), UUUUUETR = mvjoin(UUUUUETR, ","), ssssssInstructionId = mvjoin(ssssssInstructionId, ","), typeOperation = mvjoin(typeOperation, ","), refTransaction = mvjoin(refTransaction, ",")\
| where _time >= summaryDate OR isnull(summaryDate)\
| majoperation\
| eval count=if(nbOperation > count, nbOperation, count)\
| eval startDate=if(minStartDate<startDate,minStartDate, startDate) \
| where !(mvcount(index)==1 AND index="bam_xpto_summary") \
|  fillnull codeEtablissement value="N/R"\
|  fillnull refFichier value="Aucun"\
| eval summaryDate=_time, lastUpdate=now(), codeE=codeEtablissement, summaryDate_ms=mvindex(split(_time,"."),1)\
|  fields - codeEtablissement index

 

 

 

 

 

limits.conf

max_per_result_alerts = 500

buckets.png

Inspector

inspector.png

Thank you again, waiting anxiously for your answer,

Best regards,

Ricardo Alves

Labels (1)
0 Karma

alvesri
Engager

Hello PickelRick,

Thank you for the fast reply,

Here it is the results, of the subsearch alone, and the subsearch with the main search,

Do you think we can improve the summary creation? If yes, can you help us please ?

SubSearch Alone

subsearch.png

Mains Search with subsearch

search.png

Best regards,

Ricardo Fonseca

0 Karma

PickleRick
SplunkTrust
SplunkTrust

As you can see, the subsearch returns almost 180k results. Since the limit for the subsearch is 50k by default the results are most probably silently truncated and therefore your main search doesn't give you correct results.

Anyway, searching over several dozen thousands of conditions (because that's what subsearch use boils down to) is not very efficient either - Splunk has to dispatch the subsearch to indexers, pull the partial results to search-head, consolidate and process them further, combine the results into conditions in the main search, dispatch thus constructed search to indexers...

Unfortunately, without knowing your data and the goal of your search it's difficult to "optimize" it. It would most probably be best if you engaged your local partner to work through your searches (because I assume you have more of them organized this way) because it will probably be easier to write them from scratch.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

This search does seem a bit overcomplicated at first glance.

But the most important question here is how many results you get when you run the subsearch alone. And how much time it takes.

Subsearches are notorious for failing silently due to too many returned results or exceeded runtime. So you might actually be getting wrong results from such formulated search.

And this search needs rethinking. You seem to be sorting all your data prematurely. Then you do some statsing after which you do where condition - it might be OK, but maybe it could use some optimizing so you cut down on your search results earlier in the pipeline.

0 Karma
Get Updates on the Splunk Community!

Get Your Exclusive Splunk Certified Cybersecurity Defense Engineer at Splunk .conf24 ...

We’re excited to announce a new Splunk certification exam being released at .conf24! If you’re headed to Vegas ...

Share Your Ideas & Meet the Lantern team at .Conf! Plus All of This Month’s New ...

Splunk Lantern is Splunk’s customer success center that provides advice from Splunk experts on valuable data ...

Combine Multiline Logs into a Single Event with SOCK: a Step-by-Step Guide for ...

Combine multiline logs into a single event with SOCK - a step-by-step guide for newbies Olga Malita The ...