Splunk Search

How to merge two message threads into one?

metylkinandrey
Communicator

I have two message threads, each thread consists of ten messages. I need to request to display these two chains in one.
The new thread must consist of ten different messages: five messages from one system, five messages from another (backup) system. Messages from the system use the same SrcMsgId value. Each system has a unique SrcMsgId within the same chain. The message chain from the backup system enters the splunk immediately after the messages from the main system. Messages from the standby system also have a Mainsys_srcMsgId value - this value is identical to the main system's SrcMsgId value. Tell me how can I display a chain of all ten messages? Perhaps first messages from the first system (main), then from the second (backup) with the display of the time of arrival at the server. 

Specifically, we want to see all ten messages one after the other, in the order in which they arrived at the server. Five messages from the primary, for example: ("srcMsgId": "rwfsdfsfqwe121432gsgsfgd71") and five from the backup: ("srcMsgId": "rwfsdfsfqwe121432gsgsfgd72"). The problem is that messages from other systems also come to the server, all messages are mixed (chaotically), which is why we want to organize all messages from one system and its relative in the search. Messages from the backup system are associated with the main system only by this parameter: "Mainsys_srcMsgId" - using this key, we understand that messages come from the backup system (secondary to the main one).

Examples of messages from the primary and secondary system:

Main system:

{
"event": "Sourcetype test please",
"sourcetype": "testsystem-2",
"host": "some-host-123",
"fields":
{
"messageId": "ED280816-E404-444A-A2D9-FFD2D171F32",
"srcMsgId": "rwfsdfsfqwe121432gsgsfgd71",
"Mainsys_srcMsgId": "",
"baseSystemId": "abc1",
"routeInstanceId": "abc2",
"routepointID": "abc3",
"eventTime": "1985-04-12T23:20:50Z",
"messageType": "abc4",

..........................................................................................

Message from backup system:

{
"event": "Sourcetype test please",
"sourcetype": "testsystem-2",
"host": "some-host-123",
"fields":
{
"messageId": "ED280816-E404-444A-A2D9-FFD2D171F23",
"srcMsgId": "rwfsdfsfqwe121432gsgsfgd72",
"Mainsys_srcMsgId": "rwfsdfsfqwe121432gsgsfgd71",
"baseSystemId": "abc1",
"routeInstanceId": "abc2",
"routepointID": "abc3",
"eventTime": "1985-04-12T23:20:50Z",
"messageType": "abc4",
"GISGMPRequestID": "PS000BA780816-E404-444A-A2D9-FFD2D1712345",
"GISGMPResponseID": "PS000BA780816-E404-444B-A2D9-FFD2D1712345",
"resultcode": "abc7",
"resultdesc": "abc8"
}
}

When we want to combine in a query only five messages from one chain, related: "srcMsgId".
We make the following request:

index="bl_logging" sourcetype="testsystem-2"
| транзакция maxpause=5m srcMsgId Mainsys_srcMsgId messageId
| таблица _time srcMsgId Mainsys_srcMsgId messageId продолжительность eventcount
| сортировать srcMsgId_time
| streamstats current=f window=1 значения (_time) as prevTime по теме
| eval timeDiff=_time-prevTime
| delta _time как timediff

 

Labels (1)
Tags (1)
0 Karma
1 Solution

yuanliu
SplunkTrust
SplunkTrust

The key to this is to establish a field that represents the link between the main system and the backup system.  

| eval SrcMsgId = if(len('fields.Mainsys_srcMsgId')==0 OR isnull('fields.Mainsys_srcMsgId'), 'fields.srcMsgId', 'fields.Mainsys_srcMsgId')

Then, if you want to use transaction, perform transaction on this field, e.g.,

| eval SrcMsgId = if(len('fields.Mainsys_srcMsgId')==0, 'fields.srcMsgId', 'fields.Mainsys_srcMsgId')
| transaction maxpause=5m SrcMsgId fields.eventTime

You can also use stats on this field, which is less expensive than transaction.

View solution in original post

yuanliu
SplunkTrust
SplunkTrust

The key to this is to establish a field that represents the link between the main system and the backup system.  

| eval SrcMsgId = if(len('fields.Mainsys_srcMsgId')==0 OR isnull('fields.Mainsys_srcMsgId'), 'fields.srcMsgId', 'fields.Mainsys_srcMsgId')

Then, if you want to use transaction, perform transaction on this field, e.g.,

| eval SrcMsgId = if(len('fields.Mainsys_srcMsgId')==0, 'fields.srcMsgId', 'fields.Mainsys_srcMsgId')
| transaction maxpause=5m SrcMsgId fields.eventTime

You can also use stats on this field, which is less expensive than transaction.

metylkinandrey
Communicator

In a word, you need what is in the screenshot, but without using paint))

0 Karma

yuanliu
SplunkTrust
SplunkTrust

Something like this? (The first eval wouldn't be necessary if mainsys and backupsys gives difference source or some easily identifiable field.)

| eval source = if(len('fields.Mainsys_srcMsgId')==0 OR isnull('fields.Mainsys_srcMsgId'), "mainsys", "backupsys")
| eval SrcMsgId = if(source == "mainsys", 'fields.srcMsgId', 'fields.Mainsys_srcMsgId')
| reverse
| transaction maxevents=5 source
| eval SrcMsgIds = mvjoin(SrcMsgId, "+")
| stats list(_time) as _time list(eval('fields.srcMsgId')) as "fields.srcMsgId" list(eval('fields.Mainsys_srcMsgId')) as "fields.Mainsys_srcMsgId" by SrcMsgIds

 

0 Karma

metylkinandrey
Communicator

We have created two types of requests that work:

index="main"
| append [ search sourcetype=testsystem-reverse | eval chain=coalesce(Mainsys_srcMsgId,srcMsgId) ]
| append [ search sourcetype=testsystem-main | eval chain=coalesce(srcMsgId,Mainsys_srcMsgId) ]
| transaction maxpause=5m srcMsgId Mainsys_srcMsgId messageId chain
| table _time srcMsgId Mainsys_srcMsgId messageId duration eventcount chain
| sort chain _time
| streamstats current=f window=1 values(_time) as prevTime by subject
| eval timeDiff=_time-prevTime
| delta _time as timediff

And

index="main"
| eval CONNECTIG_ID=if(len('Mainsys_srcMsgId')==0 OR isnull('Mainsys_srcMsgId'),'srcMsgId','Mainsys_srcMsgId')
| stats list(routepointID) as routepoint list(srcMsgId) as srcMsgId list(Mainsys_srcMsgId) as Mainsys_srcMsgId list(eventTime) as eventTime by CONNECTIG_ID

0 Karma

metylkinandrey
Communicator

Thank you very much! You helped a lot!
This request did not work for me directly, but it turned out differently for me. I took most of the lines from your request and substituted in my previous one. And it seems to work as we need. I'll test it and send my version of the request.

0 Karma

metylkinandrey
Communicator

It actually works, but it's not quite what I need, apparently I explained it wrong.
It so happened that the first or second messages from two threads are displayed together:
rwfsdfsfqwe121432gsgsfgd100
rwfsdfsfqwe121432gsgsfgd20

I need something a little different, I need to somehow sort the entire list of incoming messages so that six messages from the main system are displayed first, then from the backup system and I can see the interval between letters (between the first and second, second and third) and so on until tenth message.

0 Karma
Get Updates on the Splunk Community!

Announcing Scheduled Export GA for Dashboard Studio

We're excited to announce the general availability of Scheduled Export for Dashboard Studio. Starting in ...

Extending Observability Content to Splunk Cloud

Watch Now!   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to leverage ...

More Control Over Your Monitoring Costs with Archived Metrics GA in US-AWS!

What if there was a way you could keep all the metrics data you need while saving on storage costs?This is now ...