Hi all,
I am new to Splunk and am struggling to get this to work.
I use Streamsets to add data to my streams. For now I save it in a dummy MongoDB, but I would like to save it directly into Splunk.
I know about the HTTP Event Collector, but how should I make this connections? What headers and URL should I use?
Thanks!
@JosIJntema - Were you able to test out metadaddy's solution? Did it work? If yes, please don't forget to resolve this post by clicking on "Accept". If you still need more help, please provide a comment with some feedback. Thanks!
I was able to make this work using the StreamSets HTTP Client processor, and the Splunk receivers/simple REST API. Modeling my HTTP client after the Splunk example, I used the following settings:
http://localhost:8089/services/receivers/simple?source=${record:value('/source')}&sourcetype=${recor...
(You can hardcode source, sourcetype if you like)${record:value('/timestamp')} User ${record:value('/username')} logged in successfully.
(Change this to whatever you want to send to Splunk)application/x-www-form-urlencoded
You'll need to configure an appropriate username/password in the Credentials tab. I set the data format to XML and passed the API response to a 'Local FS' destination for debugging.
Note - for simplicity, I turned off SSL on the Splunk REST API.
EDIT: The above mechanism works record-by-record, so it's not very efficient for large amounts of data. A better approach is to use a script to send a single request per batch to the Splunk HTTP Event Collector:
import sys
# Set to wherever the requests package lives on your machine
sys.path.append('/Library/Python/2.7/site-packages')
import requests
import json
# Endpoint for Splunk HTTP Event Collector
url = 'http://localhost:8088/services/collector'
# Read Splunk token from file and cache in state
if state.get('headers') is None:
state['headers'] = {'Authorization': 'Splunk ${runtime:loadResource('splunkToken', false)}'}
buffer = ''
# Loop through batch, building request payload
for record in records:
try:
# Strip host & time fields from record and pass to Splunk as event attributes
event = dict((key, record.value[key]) for key in record.value if key not in ['time', 'host'])
buffer += json.dumps({
'host': record.value['host'],
'time': record.value['time'],
'event': event,
}) + '\n'
# Write record to processor output
output.write(record)
except Exception as e:
# Send record to error
error.write(record, str(e))
# Now submit a single request for the entire batch
r = requests.post(url,
headers=state['headers'],
data=buffer).json()
# Check for errors from Splunk
if r['code'] != 0:
log.error('Splunk error: {}: {}', r['code'], r['text'])
raise Exception('Splunk API error {0}: {1}'.format(r['code'], r['text']))
# All is good
log.info('Splunk API response: {}', r['text'])
I wrote this up more fully at https://streamsets.com/blog/ingest-data-splunk-streamsets-data-collector/
Hi @metadaddy
I have a similar requirement. I dont have any knowledge on json / java.
Requirement is to pass everything from a JSON stream (eg: everything from https://10.24.113.206/container-ws/hystrix.stream) to Splunk. I have created token for HEC in splunk and googling all blogs to get me work done.
What should my syntax be if I follow your below example to send everything to Splunk
Request Data: ${record:value('/timestamp')} User ${record:value('/username')} logged in successfully. (Change this to whatever you want to send to Splunk)
Hi @nareshinsvu - you could try the Splunk destination - that was created since I answered this question. Also, we have a few options for our community to interact with the StreamSets team directly - see https://streamsets.com/community/