Though "| eval myfield=entropy(somefield)" would be awesome, it doesn't exist (yet?). Is there a known method for this now or am I stuck using an "associate" hack to get entropy values for fields?
If I have to write a custom command that is fine but I'm not sure how to get it into eval (as I would like to do but isn't necessary) and it would be nice if it was native since associate is already calculating entropy to do what it needs to do.
**2017 Update for Splunk 6.3+ - from the upcoming Information Theory Suite for Splunk**
Revised entropy.py for SCPv2 api. make sure "splunklib" is in bin dir. (no error handling) :
#!/usr/bin/env python
# coding=utf-8
from __future__ import absolute_import, division, print_function, unicode_literals
#####
#
# entropy.py
# Part of: Information Theory Suite for Splunk
#
# commands.conf for SCPv2 chunked (splunk6.3+) but still recommend to use the SCPv1 config to get all records from stats buffers
"""
[entropy]
filename = entropy.py
chunked = true
"""
#
# commands.conf for SCPv1 (recommended for now due to stats and tstats + chunked limitations)
"""
[entropy]
filename = entropy.py
enableheader = true
outputheader = true
requires_srinfo = true
stderr_dest = message
supports_getinfo = true
supports_rawargs = true
supports_multivalues = true
"""
#####
# - rshoward
# Credit to http://stackoverflow.com/questions/2979174/how-do-i-compute-the-approximate-entropy-of-a-bit-string
# and the Revelation codebase (GPL, https://github.com/mikelolasagasti/revelation) for the entropy function
####
from splunklib.searchcommands import dispatch, StreamingCommand, Configuration, Option, validators
import sys
import math
def entropy(string):
"Calculates the Shannon entropy of a string"
# get probability of chars in string
prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]
# calculate the entropy
entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])
return entropy
def entropy_ideal(length):
"Calculates the ideal Shannon entropy of a string with given length"
prob = 1.0 / length
ideal = -1.0 * length * prob * math.log(prob) / math.log(2.0)
return ideal
@Configuration()
class EntropyCommand(StreamingCommand):
""" Calculates entropy and ideal values for a given set of fields.
##Syntax
.. code-block::
entropy includeideal=<bool> includedistance=<bool> <field-list>
##Description
Calculate Shannon Entropy for the given field list. Values will be returned into the data stream as
entropy_<field-name>. Optionally, you can include the ideal entropy for the given field(s) string length,
as well as the distance between the field's entropy value, and the ideal entropy for said fields string length.
##Example
Search for records where the entropy of any of the fields in a list are close to the ideal entropy.
.. code-block::
index=proxy_sg | entropy includeideal=T includedistance=T host path file options | search entropy_ideal_distance* > -1
"""
includeideal = Option(
doc='''
**Syntax:** **includeideal=***<boolean>*
**Description:** Calculate and include the ideal entropy for the given field(s) length''',
require=False, validate=validators.Boolean())
includedistance = Option(
doc='''
**Syntax:** **includedistance=***<boolean>*
**Description:** Calculate and include the field's entropy distance from ideal entropy for the given field(s) length''',
require=False, validate=validators.Boolean())
def stream(self, records):
self.logger.debug('EntropyCommand init via: %s', self) # logs command line
for record in records:
for fieldname in self.fieldnames:
record["entropy_" + fieldname] = entropy(record[fieldname].decode("utf-8"))
if self.includeideal:
record["entropy_ideal_" + fieldname] = entropy_ideal(len(record[fieldname].decode("utf-8")))
if self.includedistance:
record["entropy_ideal_distance_" + fieldname] = float(record["entropy_" + fieldname]) - float(entropy_ideal(len(record[fieldname].decode("utf-8"))))
yield record
dispatch(EntropyCommand, sys.argv, sys.stdin, sys.stdout, __name__)
Original post from 2011 :
Use the following code for a custom command. via "Shannon's entropy equation is the standard method of calculation. Here is a simple implementation in Python, shamelessly copied from the Revelation codebase, and thus GPL licensed:"
def entropy(string):
"Calculates the Shannon entropy of a string"
# get probability of chars in string
prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]
# calculate the entropy
entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])
return entropy
And
def entropy_ideal(length):
"Calculates the ideal Shannon entropy of a string with given length"
prob = 1.0 / length
return -1.0 * length * prob * math.log(prob) / math.log(2.0)
EDIT: Completed streaming custom command. Already found some hidden call-backs in proxy logs.
Usage: "...| entropy [field]"
Will add a field called entropy_[field] with the shannon entropy value of the field. If no field is specified, raw is used and creates a field (with two underscores) entropy_raw.
This is easily modified to take multiple fields or can be used as is nested like "...| entropy | entropy uri_host | entropy uri_path"
1) create /opt/splunk/etc/apps/search/bin/entropy.py
import splunk.Intersplunk as si
import math, sys, os, re
import cPickle, bisect
from string import atoi
import socket, struct, csv
ATTRIBUTES = ['entropy']
(is_get_info, sys.argv) = si.isGetInfo(sys.argv)
keywords, options = si.getKeywordsAndOptions()
if len(keywords) > 0:
en_field = keywords[0]
else:
en_field = "_raw"
if is_get_info:
si.outputInfo(True, False, True, False, None, True)
def entropy(string):
"Calculates the Shannon entropy of a string"
# get probability of chars in string
prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]
# calculate the entropy
entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])
return entropy
def entropy_ideal(length):
"Calculates the ideal Shannon entropy of a string with given length (not implemented yet)"
prob = 1.0 / length
return -1.0 * length * prob * math.log(prob) / math.log(2.0)
# Strip command header
while len(sys.stdin.readline().strip()):
pass
reader = csv.DictReader(sys.stdin)
headers = reader.fieldnames
if not en_field in headers:
headers.append(en_field)
for h in ATTRIBUTES:
headers.append("%s_%s" % (h, en_field))
writer = csv.DictWriter(sys.stdout, headers)
writer.writer.writerow(headers)
for r in reader:
for f in ATTRIBUTES:
r["%s_%s" % (f, en_field)] = entropy(r[en_field])
writer.writerow(r)
2) Add to /opt/splunk/etc/apps/search/local/commands.conf
[entropy]
filename = entropy.py
overrides_timeorder = false
retainsevents = true
streaming = true
supports_getinfo = true
3) Restart Splunk
4) PROFIT!
**2017 Update for Splunk 6.3+ - from the upcoming Information Theory Suite for Splunk**
Revised entropy.py for SCPv2 api. make sure "splunklib" is in bin dir. (no error handling) :
#!/usr/bin/env python
# coding=utf-8
from __future__ import absolute_import, division, print_function, unicode_literals
#####
#
# entropy.py
# Part of: Information Theory Suite for Splunk
#
# commands.conf for SCPv2 chunked (splunk6.3+) but still recommend to use the SCPv1 config to get all records from stats buffers
"""
[entropy]
filename = entropy.py
chunked = true
"""
#
# commands.conf for SCPv1 (recommended for now due to stats and tstats + chunked limitations)
"""
[entropy]
filename = entropy.py
enableheader = true
outputheader = true
requires_srinfo = true
stderr_dest = message
supports_getinfo = true
supports_rawargs = true
supports_multivalues = true
"""
#####
# - rshoward
# Credit to http://stackoverflow.com/questions/2979174/how-do-i-compute-the-approximate-entropy-of-a-bit-string
# and the Revelation codebase (GPL, https://github.com/mikelolasagasti/revelation) for the entropy function
####
from splunklib.searchcommands import dispatch, StreamingCommand, Configuration, Option, validators
import sys
import math
def entropy(string):
"Calculates the Shannon entropy of a string"
# get probability of chars in string
prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]
# calculate the entropy
entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])
return entropy
def entropy_ideal(length):
"Calculates the ideal Shannon entropy of a string with given length"
prob = 1.0 / length
ideal = -1.0 * length * prob * math.log(prob) / math.log(2.0)
return ideal
@Configuration()
class EntropyCommand(StreamingCommand):
""" Calculates entropy and ideal values for a given set of fields.
##Syntax
.. code-block::
entropy includeideal=<bool> includedistance=<bool> <field-list>
##Description
Calculate Shannon Entropy for the given field list. Values will be returned into the data stream as
entropy_<field-name>. Optionally, you can include the ideal entropy for the given field(s) string length,
as well as the distance between the field's entropy value, and the ideal entropy for said fields string length.
##Example
Search for records where the entropy of any of the fields in a list are close to the ideal entropy.
.. code-block::
index=proxy_sg | entropy includeideal=T includedistance=T host path file options | search entropy_ideal_distance* > -1
"""
includeideal = Option(
doc='''
**Syntax:** **includeideal=***<boolean>*
**Description:** Calculate and include the ideal entropy for the given field(s) length''',
require=False, validate=validators.Boolean())
includedistance = Option(
doc='''
**Syntax:** **includedistance=***<boolean>*
**Description:** Calculate and include the field's entropy distance from ideal entropy for the given field(s) length''',
require=False, validate=validators.Boolean())
def stream(self, records):
self.logger.debug('EntropyCommand init via: %s', self) # logs command line
for record in records:
for fieldname in self.fieldnames:
record["entropy_" + fieldname] = entropy(record[fieldname].decode("utf-8"))
if self.includeideal:
record["entropy_ideal_" + fieldname] = entropy_ideal(len(record[fieldname].decode("utf-8")))
if self.includedistance:
record["entropy_ideal_distance_" + fieldname] = float(record["entropy_" + fieldname]) - float(entropy_ideal(len(record[fieldname].decode("utf-8"))))
yield record
dispatch(EntropyCommand, sys.argv, sys.stdin, sys.stdout, __name__)
Original post from 2011 :
Use the following code for a custom command. via "Shannon's entropy equation is the standard method of calculation. Here is a simple implementation in Python, shamelessly copied from the Revelation codebase, and thus GPL licensed:"
def entropy(string):
"Calculates the Shannon entropy of a string"
# get probability of chars in string
prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]
# calculate the entropy
entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])
return entropy
And
def entropy_ideal(length):
"Calculates the ideal Shannon entropy of a string with given length"
prob = 1.0 / length
return -1.0 * length * prob * math.log(prob) / math.log(2.0)
EDIT: Completed streaming custom command. Already found some hidden call-backs in proxy logs.
Usage: "...| entropy [field]"
Will add a field called entropy_[field] with the shannon entropy value of the field. If no field is specified, raw is used and creates a field (with two underscores) entropy_raw.
This is easily modified to take multiple fields or can be used as is nested like "...| entropy | entropy uri_host | entropy uri_path"
1) create /opt/splunk/etc/apps/search/bin/entropy.py
import splunk.Intersplunk as si
import math, sys, os, re
import cPickle, bisect
from string import atoi
import socket, struct, csv
ATTRIBUTES = ['entropy']
(is_get_info, sys.argv) = si.isGetInfo(sys.argv)
keywords, options = si.getKeywordsAndOptions()
if len(keywords) > 0:
en_field = keywords[0]
else:
en_field = "_raw"
if is_get_info:
si.outputInfo(True, False, True, False, None, True)
def entropy(string):
"Calculates the Shannon entropy of a string"
# get probability of chars in string
prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]
# calculate the entropy
entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])
return entropy
def entropy_ideal(length):
"Calculates the ideal Shannon entropy of a string with given length (not implemented yet)"
prob = 1.0 / length
return -1.0 * length * prob * math.log(prob) / math.log(2.0)
# Strip command header
while len(sys.stdin.readline().strip()):
pass
reader = csv.DictReader(sys.stdin)
headers = reader.fieldnames
if not en_field in headers:
headers.append(en_field)
for h in ATTRIBUTES:
headers.append("%s_%s" % (h, en_field))
writer = csv.DictWriter(sys.stdout, headers)
writer.writer.writerow(headers)
for r in reader:
for f in ATTRIBUTES:
r["%s_%s" % (f, en_field)] = entropy(r[en_field])
writer.writerow(r)
2) Add to /opt/splunk/etc/apps/search/local/commands.conf
[entropy]
filename = entropy.py
overrides_timeorder = false
retainsevents = true
streaming = true
supports_getinfo = true
3) Restart Splunk
4) PROFIT!
Sadly, was getting an error code 1 from this script... I think I fixed it by removing the "Strip command header" section and setting enableheader=false in commands.conf.
for clarification, since entropy is a broad topic, I am refering to the following. (via http://en.wikipedia.org/wiki/Entropy_%28information_theory%29)
"A long string of repeating characters has an entropy rate of 0, since every character is predictable. The entropy rate of English text is between 1.0 and 1.5 bits per letter,[1] or as low as 0.6 to 1.3 bits per letter, according to estimates by Shannon based on human experiments."
Entropy wouldn't be a eval function since it requires all values of the field, not the value for a single event. Here would be how you'd calculate it using aggregating commands:
... | stats count as somefield_count by somefield
| eventstats sum(somefield_count) as somefield_total
| eval somefield_plogp = -1*log(somefield_count/somefield_total)*somefield_count/somefield_total
| stats sum(somefield_plogp) as entropy
Cool! Thanks. I just noticed associate kind of did what I was looking for and I thought I was maybe missing something. Thanks again you've given me an awesome start. 🙂
It's trickier to apply the standard definition of entropy to a single string, and even harder to use Splunk to compute it. Writing your own search command is probably the best approach.
I noticed there is a lot of discussion out there on exactly what people mean when they talk about entropy. I agree with you in that "entropy" would use all the values for a dataset but I'm talking about the amount of information needed to reproduce a single string as a dataset. where "a" is just as "complex" as "aaaaaa" and less complex than "abc" and in alpha numeric, a standard word or sentence has less entropy than say a base64 string. I do this using another product now and it is great for finding hidden channels in web traffic. via the wikipedia article on entropy (information theory)