I was able to make this work using the StreamSets HTTP Client processor, and the Splunk receivers/simple REST API. Modeling my HTTP client after the Splunk example, I used the following settings:
Resource URL: http://localhost:8089/services/receivers/simple?source=${record:value('/source')}&sourcetype=${record:value('/sourcetype')} (You can hardcode source, sourcetype if you like)
HTTP Method: POST
Request Data: ${record:value('/timestamp')} User ${record:value('/username')} logged in successfully. (Change this to whatever you want to send to Splunk)
Default Request Content Type: application/x-www-form-urlencoded
Authentication Type: Basic
You'll need to configure an appropriate username/password in the Credentials tab. I set the data format to XML and passed the API response to a 'Local FS' destination for debugging.
Note - for simplicity, I turned off SSL on the Splunk REST API.
EDIT: The above mechanism works record-by-record, so it's not very efficient for large amounts of data. A better approach is to use a script to send a single request per batch to the Splunk HTTP Event Collector:
import sys
# Set to wherever the requests package lives on your machine
sys.path.append('/Library/Python/2.7/site-packages')
import requests
import json
# Endpoint for Splunk HTTP Event Collector
url = 'http://localhost:8088/services/collector'
# Read Splunk token from file and cache in state
if state.get('headers') is None:
state['headers'] = {'Authorization': 'Splunk ${runtime:loadResource('splunkToken', false)}'}
buffer = ''
# Loop through batch, building request payload
for record in records:
try:
# Strip host & time fields from record and pass to Splunk as event attributes
event = dict((key, record.value[key]) for key in record.value if key not in ['time', 'host'])
buffer += json.dumps({
'host': record.value['host'],
'time': record.value['time'],
'event': event,
}) + '\n'
# Write record to processor output
output.write(record)
except Exception as e:
# Send record to error
error.write(record, str(e))
# Now submit a single request for the entire batch
r = requests.post(url,
headers=state['headers'],
data=buffer).json()
# Check for errors from Splunk
if r['code'] != 0:
log.error('Splunk error: {}: {}', r['code'], r['text'])
raise Exception('Splunk API error {0}: {1}'.format(r['code'], r['text']))
# All is good
log.info('Splunk API response: {}', r['text'])
... View more