Splunk Search

Help using appendcols or join

fmfx1001
Explorer

I wonder if someone can help me out with an issue I'm having using the append, appendcols, or join commands. Truth be told, I'm not sure which command I ought to be using to join two data sets together and comparing the value of the same field in both data sets.

Here is what I am trying to accomplish:

I have two data sets: one for today and one for yesterday

I want to know if the value in the "hash" field has changed from yesterday to today. The field names are identical for both data sets.

Here is an example of one of the searches I have tried. Does anyone know how I can get this working please?

search_for_todays_data
| table url, hash
| eval new_hash=hash
| appendcols
[| search search_for_yesterdays_data | table url, hash
| eval old_hash=hash]
| where new_hash != old_hash

Thanks in advance for any help

Tags (1)
0 Karma
1 Solution

MuS
Legend

Hi fmfx1001,

try this:

search_for_todays_data OR search_for_yesterdays_data earliest=-1d@d latest=-0d@d 
| fields _time url hash
| bucket _time span=1d 
| streamstats current=false last(hash) as new_hash last(_time) as time_of_change by url 
| where hash!=new_hash 
| convert ctime(time_of_change) as time_of_change 
| rename hash as old_hash 
| table time_of_change, url, old_hash, new_hash

if this does not work, can you please provided some examples?

Hope this helps ...

cheers, MuS

View solution in original post

MuS
Legend

Hi fmfx1001,

try this:

search_for_todays_data OR search_for_yesterdays_data earliest=-1d@d latest=-0d@d 
| fields _time url hash
| bucket _time span=1d 
| streamstats current=false last(hash) as new_hash last(_time) as time_of_change by url 
| where hash!=new_hash 
| convert ctime(time_of_change) as time_of_change 
| rename hash as old_hash 
| table time_of_change, url, old_hash, new_hash

if this does not work, can you please provided some examples?

Hope this helps ...

cheers, MuS

fmfx1001
Explorer

That worked a treat. Thank you so much for your help.

0 Karma

fmfx1001
Explorer

Doh! Before I read your reply, I just got this search working. I will read those links you posted tomorrow and try the search you suggested. Thank you for your response, I appreciate the help.

sourcetype=logs target_page_hash=* earliest=@d latest=now | eval new_hash=target_page_hash
| join target_page
[| search sourcetype=logs earliest=-2d@d latest=@d target_page_hash=* | eval old_hash=target_page_hash]
| where new_hash != old_hash
| table target_page, old_hash, new_hash
0 Karma

DalJeanis
Legend

Here's one way. Basically, you search up two days worth of records, and then copy each record to one day later. The only records you care about are the ones that have two different hash values, so you don't even have to have a step to eliminate yesterday's records that didn't get pushed forward or today's that did get pushed forward to tomorrow. (Usually this type of solution requires that much cleanup, but you get the easy-peasy way.)

(search for todays-or-yesterday's data)
| table _time url, hash
| bin _time span=1d
| eval splitter="A B"     | makemv splitter     | mvexpand splitter
| eval _time=if(splitter="A",_time,_time+86400)
| stats values(*) as * by _time url
| where mvcount(hash)>1

...and, modified based on your accepted answer, to use the actual fields and indexes...

 sourcetype=logs target_page_hash=* earliest=-2d
 | table _time target_page, target_page_hash
 | bin _time span=1d
 | eval splitter="A B"     | makemv splitter     | mvexpand splitter
 | eval _time=if(splitter="A",_time,_time+86400)
 | stats values(*) as * by _time target_page
 | where mvcount(target_page_hash)>1
0 Karma

fmfx1001
Explorer

Thanks for your response but that didn't work for me. I see no results.

I'm having trouble understanding your code line-by-line also. I'm basically trying to compare the hash of the web page (url) from the previous day to the current day. There are numerous urls.

0 Karma

DalJeanis
Legend

Line by line explanation, so you can see what is going on...

 (search for todays-or-yesterday's data)

Your search needs to return a value for _time which is sometime today or yesterday, a value for url, and a value for hash. For example, it should be returning a record for yesterday for myurl.com, and a record for today for myurl.com

This command gets rid of everything but those three fields

| table _time url, hash

This command makes sure the _time contains only the day value... we dont' care about the time

| bin _time span=1d

These commands create a multivalue field named "splitter", give it two values A and B, and then copy the entire url-hash record into one record for A, and one record for B

| eval splitter="A B" | makemv splitter | mvexpand splitter

This command adds one day (86400 seconds) to the _time of the B record. The effect of this is that yesterdays records will have an A record with yesterday's data adn a B record with today's date. Today's records will have an A record with today's date, and a B record with tomorrow's date.

| eval _time=if(splitter="A",_time,_time+86400)

This rolls together the values for each URL for each day for all fields except _time and url. The values() aggregate command will only retain distinct values, so if the hash has not changed, there will only be one value. Also, if the date is yesterday or tomorrow, there will only be one record, so only one value.

| stats values(*) as * by _time url

This kills all the records that have only one value.

| where mvcount(hash)>1

The result will be only records that had one hash value yesterday, and have another hash value today.


You've already accepted an answer, but I went ahead and posted a search in my original answer that you should be able to cut-and-paste to execute. You can paste it in line-by-line and execute the search -- with | head 5 after it or with target_page=someparticularpage.com so you don't get too much output -- and see how each command transforms the results.

  sourcetype=logs target_page_hash=* earliest=-2d
  | table _time target_page, target_page_hash
  | bin _time span=1d
  | eval splitter="A B"     | makemv splitter     | mvexpand splitter
  | eval _time=if(splitter="A",_time,_time+86400)
  | stats values(*) as * by _time target_page
  | where mvcount(target_page_hash)>1
Get Updates on the Splunk Community!

Splunk Enterprise Security 8.0.2 Availability: On cloud and On-premise!

A few months ago, we released Splunk Enterprise Security 8.0 for our cloud customers. Today, we are excited to ...

Logs to Metrics

Logs and Metrics Logs are generally unstructured text or structured events emitted by applications and written ...

Developer Spotlight with Paul Stout

Welcome to our very first developer spotlight release series where we'll feature some awesome Splunk ...