Splunk Search

Calculate entropy (just entropy, not change in entropy like "associate")

rshoward
Path Finder

Though "| eval myfield=entropy(somefield)" would be awesome, it doesn't exist (yet?). Is there a known method for this now or am I stuck using an "associate" hack to get entropy values for fields?

If I have to write a custom command that is fine but I'm not sure how to get it into eval (as I would like to do but isn't necessary) and it would be nice if it was native since associate is already calculating entropy to do what it needs to do.

1 Solution

rshoward
Path Finder

**2017 Update for Splunk 6.3+ - from the upcoming Information Theory Suite for Splunk**

Revised entropy.py for SCPv2 api. make sure "splunklib" is in bin dir. (no error handling) :

#!/usr/bin/env python
# coding=utf-8
from __future__ import absolute_import, division, print_function, unicode_literals

#####
#
# entropy.py
# Part of: Information Theory Suite for Splunk
#
# commands.conf  for SCPv2 chunked (splunk6.3+) but still recommend to use the SCPv1 config to get all records from stats buffers
"""
[entropy]
filename = entropy.py
chunked = true
"""
#
# commands.conf for SCPv1 (recommended for now due to stats and tstats + chunked limitations)
"""
[entropy]
filename = entropy.py
enableheader = true
outputheader = true
requires_srinfo = true
stderr_dest = message
supports_getinfo = true
supports_rawargs = true
supports_multivalues = true
"""
#####
# - rshoward
# Credit to http://stackoverflow.com/questions/2979174/how-do-i-compute-the-approximate-entropy-of-a-bit-string
# and the Revelation codebase (GPL, https://github.com/mikelolasagasti/revelation) for the entropy function
####

from splunklib.searchcommands import dispatch, StreamingCommand, Configuration, Option, validators
import sys

import math

def entropy(string):
    "Calculates the Shannon entropy of a string"

    # get probability of chars in string
    prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]

    # calculate the entropy
    entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])

    return entropy

def entropy_ideal(length):
    "Calculates the ideal Shannon entropy of a string with given length"

    prob = 1.0 / length

    ideal = -1.0 * length * prob * math.log(prob) / math.log(2.0)

    return ideal



@Configuration()
class EntropyCommand(StreamingCommand):
    """ Calculates entropy and ideal values for a given set of fields.

    ##Syntax

    .. code-block::
        entropy includeideal=<bool> includedistance=<bool> <field-list>

    ##Description

    Calculate Shannon Entropy for the given field list. Values will be returned into the data stream as 
    entropy_<field-name>. Optionally, you can include the ideal entropy for the given field(s) string length, 
    as well as the distance between the field's entropy value, and the ideal entropy for said fields string length.

    ##Example

    Search for records where the entropy of any of the fields in a list are close to the ideal entropy.

    .. code-block::
        index=proxy_sg | entropy includeideal=T includedistance=T host path file options | search entropy_ideal_distance* > -1

    """
    includeideal = Option(
        doc='''
        **Syntax:** **includeideal=***<boolean>*
        **Description:** Calculate and include the ideal entropy for the given field(s) length''',
        require=False, validate=validators.Boolean())

    includedistance = Option(
        doc='''
        **Syntax:** **includedistance=***<boolean>*
        **Description:** Calculate and include the field's entropy distance from ideal entropy for the given field(s) length''',
        require=False, validate=validators.Boolean())


    def stream(self, records):
        self.logger.debug('EntropyCommand init via: %s', self)  # logs command line
        for record in records:
            for fieldname in self.fieldnames:
                record["entropy_" + fieldname] = entropy(record[fieldname].decode("utf-8"))
                if self.includeideal:
                    record["entropy_ideal_" + fieldname] = entropy_ideal(len(record[fieldname].decode("utf-8")))
                if self.includedistance:
                    record["entropy_ideal_distance_" + fieldname] = float(record["entropy_" + fieldname]) - float(entropy_ideal(len(record[fieldname].decode("utf-8"))))
            yield record

dispatch(EntropyCommand, sys.argv, sys.stdin, sys.stdout, __name__) 


Original post from 2011 :

Use the following code for a custom command. via "Shannon's entropy equation is the standard method of calculation. Here is a simple implementation in Python, shamelessly copied from the Revelation codebase, and thus GPL licensed:"

def entropy(string):
    "Calculates the Shannon entropy of a string"

    # get probability of chars in string
    prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]

    # calculate the entropy
    entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])

    return entropy

And

def entropy_ideal(length):
    "Calculates the ideal Shannon entropy of a string with given length"

    prob = 1.0 / length

    return -1.0 * length * prob * math.log(prob) / math.log(2.0)

EDIT: Completed streaming custom command. Already found some hidden call-backs in proxy logs.

Usage: "...| entropy [field]"

Will add a field called entropy_[field] with the shannon entropy value of the field. If no field is specified, raw is used and creates a field (with two underscores) entropy_raw.

This is easily modified to take multiple fields or can be used as is nested like "...| entropy | entropy uri_host | entropy uri_path"

1) create /opt/splunk/etc/apps/search/bin/entropy.py

import splunk.Intersplunk as si
import math, sys, os, re
import cPickle, bisect
from string import atoi
import socket, struct, csv

ATTRIBUTES = ['entropy']


(is_get_info, sys.argv) = si.isGetInfo(sys.argv)

keywords, options = si.getKeywordsAndOptions()
if len(keywords) &gt; 0:
        en_field = keywords[0]
else:
        en_field = "_raw"

if is_get_info:
        si.outputInfo(True, False, True, False, None, True)


def entropy(string):
    "Calculates the Shannon entropy of a string"

    # get probability of chars in string
    prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]

    # calculate the entropy
    entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])

    return entropy

def entropy_ideal(length):
    "Calculates the ideal Shannon entropy of a string with given length (not implemented yet)"

    prob = 1.0 / length

    return -1.0 * length * prob * math.log(prob) / math.log(2.0)


# Strip command header

while len(sys.stdin.readline().strip()):
        pass

reader = csv.DictReader(sys.stdin)
headers = reader.fieldnames
if not en_field in headers:
        headers.append(en_field)
for h in ATTRIBUTES:
        headers.append("%s_%s" % (h, en_field))
writer = csv.DictWriter(sys.stdout, headers)
writer.writer.writerow(headers)

for r in reader:

        for f in ATTRIBUTES:
                r["%s_%s" % (f, en_field)] = entropy(r[en_field])
        writer.writerow(r)

2) Add to /opt/splunk/etc/apps/search/local/commands.conf

[entropy]
filename = entropy.py
overrides_timeorder = false
retainsevents = true
streaming = true
supports_getinfo = true

3) Restart Splunk

4) PROFIT!

View solution in original post

rshoward
Path Finder

**2017 Update for Splunk 6.3+ - from the upcoming Information Theory Suite for Splunk**

Revised entropy.py for SCPv2 api. make sure "splunklib" is in bin dir. (no error handling) :

#!/usr/bin/env python
# coding=utf-8
from __future__ import absolute_import, division, print_function, unicode_literals

#####
#
# entropy.py
# Part of: Information Theory Suite for Splunk
#
# commands.conf  for SCPv2 chunked (splunk6.3+) but still recommend to use the SCPv1 config to get all records from stats buffers
"""
[entropy]
filename = entropy.py
chunked = true
"""
#
# commands.conf for SCPv1 (recommended for now due to stats and tstats + chunked limitations)
"""
[entropy]
filename = entropy.py
enableheader = true
outputheader = true
requires_srinfo = true
stderr_dest = message
supports_getinfo = true
supports_rawargs = true
supports_multivalues = true
"""
#####
# - rshoward
# Credit to http://stackoverflow.com/questions/2979174/how-do-i-compute-the-approximate-entropy-of-a-bit-string
# and the Revelation codebase (GPL, https://github.com/mikelolasagasti/revelation) for the entropy function
####

from splunklib.searchcommands import dispatch, StreamingCommand, Configuration, Option, validators
import sys

import math

def entropy(string):
    "Calculates the Shannon entropy of a string"

    # get probability of chars in string
    prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]

    # calculate the entropy
    entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])

    return entropy

def entropy_ideal(length):
    "Calculates the ideal Shannon entropy of a string with given length"

    prob = 1.0 / length

    ideal = -1.0 * length * prob * math.log(prob) / math.log(2.0)

    return ideal



@Configuration()
class EntropyCommand(StreamingCommand):
    """ Calculates entropy and ideal values for a given set of fields.

    ##Syntax

    .. code-block::
        entropy includeideal=<bool> includedistance=<bool> <field-list>

    ##Description

    Calculate Shannon Entropy for the given field list. Values will be returned into the data stream as 
    entropy_<field-name>. Optionally, you can include the ideal entropy for the given field(s) string length, 
    as well as the distance between the field's entropy value, and the ideal entropy for said fields string length.

    ##Example

    Search for records where the entropy of any of the fields in a list are close to the ideal entropy.

    .. code-block::
        index=proxy_sg | entropy includeideal=T includedistance=T host path file options | search entropy_ideal_distance* > -1

    """
    includeideal = Option(
        doc='''
        **Syntax:** **includeideal=***<boolean>*
        **Description:** Calculate and include the ideal entropy for the given field(s) length''',
        require=False, validate=validators.Boolean())

    includedistance = Option(
        doc='''
        **Syntax:** **includedistance=***<boolean>*
        **Description:** Calculate and include the field's entropy distance from ideal entropy for the given field(s) length''',
        require=False, validate=validators.Boolean())


    def stream(self, records):
        self.logger.debug('EntropyCommand init via: %s', self)  # logs command line
        for record in records:
            for fieldname in self.fieldnames:
                record["entropy_" + fieldname] = entropy(record[fieldname].decode("utf-8"))
                if self.includeideal:
                    record["entropy_ideal_" + fieldname] = entropy_ideal(len(record[fieldname].decode("utf-8")))
                if self.includedistance:
                    record["entropy_ideal_distance_" + fieldname] = float(record["entropy_" + fieldname]) - float(entropy_ideal(len(record[fieldname].decode("utf-8"))))
            yield record

dispatch(EntropyCommand, sys.argv, sys.stdin, sys.stdout, __name__) 


Original post from 2011 :

Use the following code for a custom command. via "Shannon's entropy equation is the standard method of calculation. Here is a simple implementation in Python, shamelessly copied from the Revelation codebase, and thus GPL licensed:"

def entropy(string):
    "Calculates the Shannon entropy of a string"

    # get probability of chars in string
    prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]

    # calculate the entropy
    entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])

    return entropy

And

def entropy_ideal(length):
    "Calculates the ideal Shannon entropy of a string with given length"

    prob = 1.0 / length

    return -1.0 * length * prob * math.log(prob) / math.log(2.0)

EDIT: Completed streaming custom command. Already found some hidden call-backs in proxy logs.

Usage: "...| entropy [field]"

Will add a field called entropy_[field] with the shannon entropy value of the field. If no field is specified, raw is used and creates a field (with two underscores) entropy_raw.

This is easily modified to take multiple fields or can be used as is nested like "...| entropy | entropy uri_host | entropy uri_path"

1) create /opt/splunk/etc/apps/search/bin/entropy.py

import splunk.Intersplunk as si
import math, sys, os, re
import cPickle, bisect
from string import atoi
import socket, struct, csv

ATTRIBUTES = ['entropy']


(is_get_info, sys.argv) = si.isGetInfo(sys.argv)

keywords, options = si.getKeywordsAndOptions()
if len(keywords) &gt; 0:
        en_field = keywords[0]
else:
        en_field = "_raw"

if is_get_info:
        si.outputInfo(True, False, True, False, None, True)


def entropy(string):
    "Calculates the Shannon entropy of a string"

    # get probability of chars in string
    prob = [ float(string.count(c)) / len(string) for c in dict.fromkeys(list(string)) ]

    # calculate the entropy
    entropy = - sum([ p * math.log(p) / math.log(2.0) for p in prob ])

    return entropy

def entropy_ideal(length):
    "Calculates the ideal Shannon entropy of a string with given length (not implemented yet)"

    prob = 1.0 / length

    return -1.0 * length * prob * math.log(prob) / math.log(2.0)


# Strip command header

while len(sys.stdin.readline().strip()):
        pass

reader = csv.DictReader(sys.stdin)
headers = reader.fieldnames
if not en_field in headers:
        headers.append(en_field)
for h in ATTRIBUTES:
        headers.append("%s_%s" % (h, en_field))
writer = csv.DictWriter(sys.stdout, headers)
writer.writer.writerow(headers)

for r in reader:

        for f in ATTRIBUTES:
                r["%s_%s" % (f, en_field)] = entropy(r[en_field])
        writer.writerow(r)

2) Add to /opt/splunk/etc/apps/search/local/commands.conf

[entropy]
filename = entropy.py
overrides_timeorder = false
retainsevents = true
streaming = true
supports_getinfo = true

3) Restart Splunk

4) PROFIT!

kogane
Path Finder

Sadly, was getting an error code 1 from this script... I think I fixed it by removing the "Strip command header" section and setting enableheader=false in commands.conf.

0 Karma

rshoward
Path Finder

for clarification, since entropy is a broad topic, I am refering to the following. (via http://en.wikipedia.org/wiki/Entropy_%28information_theory%29)
"A long string of repeating characters has an entropy rate of 0, since every character is predictable. The entropy rate of English text is between 1.0 and 1.5 bits per letter,[1] or as low as 0.6 to 1.3 bits per letter, according to estimates by Shannon based on human experiments."

0 Karma

Stephen_Sorkin
Splunk Employee
Splunk Employee

Entropy wouldn't be a eval function since it requires all values of the field, not the value for a single event. Here would be how you'd calculate it using aggregating commands:

... | stats count as somefield_count by somefield
    | eventstats sum(somefield_count) as somefield_total
    | eval somefield_plogp = -1*log(somefield_count/somefield_total)*somefield_count/somefield_total
    | stats sum(somefield_plogp) as entropy

rshoward
Path Finder

Cool! Thanks. I just noticed associate kind of did what I was looking for and I thought I was maybe missing something. Thanks again you've given me an awesome start. 🙂

0 Karma

Stephen_Sorkin
Splunk Employee
Splunk Employee

It's trickier to apply the standard definition of entropy to a single string, and even harder to use Splunk to compute it. Writing your own search command is probably the best approach.

rshoward
Path Finder

I noticed there is a lot of discussion out there on exactly what people mean when they talk about entropy. I agree with you in that "entropy" would use all the values for a dataset but I'm talking about the amount of information needed to reproduce a single string as a dataset. where "a" is just as "complex" as "aaaaaa" and less complex than "abc" and in alpha numeric, a standard word or sentence has less entropy than say a base64 string. I do this using another product now and it is great for finding hidden channels in web traffic. via the wikipedia article on entropy (information theory)

0 Karma
Get Updates on the Splunk Community!

Transform your security operations with Splunk Enterprise Security

Hi Splunk Community, Splunk Platform has set a great foundation for your security operations. With the ...

Splunk Admins and App Developers | Earn a $35 gift card!

Splunk, in collaboration with ESG (Enterprise Strategy Group) by TechTarget, is excited to announce a ...

Enterprise Security Content Update (ESCU) | New Releases

In October, the Splunk Threat Research Team had one release of new security content via the Enterprise ...