Greetings. We have data in Splunk, and related data in Oracle. We are looking to integrate the two using Talend. Has anyone had success in using Splunk as an input into Talend? Basically, I want to treat Splunk and Oracle as two inputs and merge the data on a key field. Thanks very much for any guidance.
I know this is an old-ish thread, but I had the same basic problem, and ended up using straight java (tJavaFlex) to call the splunk rest API to pull out data with a lot of success. I'm posting this in case anybody else (including my future-self) comes across this thread and needs a solution. It would be nice as a stand-alone talend component, but I have other things to move onto now. This is code for a talend tJavaFlex component, but is not really all that talend-specific.
I used org.apache.httpcomponents:httpclient:4.5 and commons-io:commons-io:2.4 to call the service (the built-in talend components to call services weren't flexible enough for me).
If you're trying to use this code in a production environment, you might want to beef it up a little. try/catch/finally to close out http connections, for example. Anyway, works for me.
Imports:
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import javax.net.ssl.SSLContext;
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
Start code:
// input parameters
boolean selfSignedCert = Boolean.valueOf(context.splunkSelfSignedCert);
String splunkBaseUrl = context.splunkBaseUrl; // for example: "https://localhost:8089/services"
String splunkUsername = context.splunkUsername;
String splunkPassword = context.splunkPassword;
String splunkSearchString = "search index=_internal"; // an example search
String splunkEarliestTime = "-30seconds";
String splunkLatestTime = "now";
// Start doing the work...
if(splunkBaseUrl==null) throw new NullPointerException("context.splunkBaseUrl");
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
// build the uri
URI splunkServiceUri = new URI(splunkBaseUrl);
URIBuilder uriBuilder = new URIBuilder(splunkBaseUrl)
.setPath(splunkServiceUri.getPath()+"/search/jobs/export")
.setParameter("search", splunkSearchString)
.setParameter("output_mode", "csv");
if(splunkEarliestTime!=null) {
uriBuilder.setParameter("earliest_time", splunkEarliestTime);
}
if(splunkLatestTime!=null) {
uriBuilder.setParameter("latest_time", splunkLatestTime);
}
splunkServiceUri = uriBuilder.build();
CloseableHttpClient httpClient;
if(!selfSignedCert) {
httpClient = HttpClients.createDefault();
} else {
// override ssl trust logic
SSLContext sslContext = SSLContextBuilder.create()
.loadTrustMaterial(new TrustStrategy() {
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
return true;
}})
.build();
httpClient = HttpClients.custom()
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setSSLContext(sslContext)
.build();
}
// build basic auth value...
String basicAuthValue = DatatypeConverter.printBase64Binary((splunkUsername+":"+splunkPassword).getBytes("UTF-8"));
HttpGet httpGet = new HttpGet(splunkServiceUri);
httpGet.setHeader("Authorization", "Basic "+basicAuthValue);
//System.out.println("Executing... "+httpGet.getRequestLine());
CloseableHttpResponse httpResponse = httpClient.execute(httpGet);
// wait for response code / check it
int responseCode = httpResponse.getStatusLine().getStatusCode();
if(responseCode == 401) {
throw new RuntimeException("Unautorized to acess "+splunkBaseUrl+" as "+splunkUsername);
} else if(responseCode != 200) {
// error condition
String errorMessage = "Invalid response ("+responseCode+") from "+splunkServiceUri;
InputStream errorStream = httpResponse.getEntity().getContent();
System.err.println(errorMessage);
IOUtils.copy(errorStream, System.err);
throw new RuntimeException(errorMessage);
}
// read the response stream
InputStream inputStream = httpResponse.getEntity().getContent();
BufferedReader lineReader = new BufferedReader(new InputStreamReader(inputStream));
String line = lineReader.readLine(); // skip over header line
final int expectedFieldCount = 8;
ArrayList<String> results = new ArrayList<String>(expectedFieldCount);
int lineNo = 0;
while((line = lineReader.readLine())!=null)
{
lineNo++;
Main code:
results.clear();
CsvLineParser.parse(line, results);
if(results.size() != expectedFieldCount) {
System.err.println("WARNING: unexpected field count on line "+lineNo+". Expected "+expectedFieldCount+" but got "+results.size());
}
row1._serial = Integer.parseInt(results.get(0));
row1._time = dateFormat.parse(results.get(1));
row1.source = results.get(2);
row1.sourcetype = results.get(3);
row1.host = results.get(4);
row1.index = results.get(5);
row1.splunk_server = results.get(6);
row1._raw = results.get(7);
End code:
}
And a routine:
package routines;
import java.util.List;
import java.util.ArrayList;
public class CsvLineParser {
public static List<String> parse(String line) {
ArrayList<String> result = new ArrayList<String>();
parse(line, result);
return result;
}
public static void parse(String line, List<String> resultList) {
parse(line, resultList, ',', '"');
}
public static void parse(String line, List<String> resultList, char fieldDelimiter, char quoteChar) {
int chrIndex1=0;
while(true) {
String token;
boolean quoted = line.charAt(chrIndex1)==quoteChar;
int chrIndex2;
if(!quoted) {
chrIndex2 = line.indexOf(fieldDelimiter, chrIndex1);
if(chrIndex2<0) {
chrIndex2 = line.length();
}
token = line.substring(chrIndex1, chrIndex2);
chrIndex1 = chrIndex2+1;
} else {
StringBuffer buffer = null;
chrIndex1++; // skip leading quote
while(true) {
chrIndex2 = line.indexOf(quoteChar, chrIndex1);
if(chrIndex2<line.length()-1 && line.charAt(chrIndex2+1)==quoteChar) {
if(buffer==null) buffer = new StringBuffer();
buffer.append(line.substring(chrIndex1,chrIndex2));
buffer.append(quoteChar);
chrIndex1=chrIndex2+2; // skip over intermediate quotes
} else {
break;
}
}
if(chrIndex2<0) {
System.err.println("WARNING: unterminated quoted value");
chrIndex2 = line.length();
}
if(buffer==null) {
token = line.substring(chrIndex1, chrIndex2);
} else {
buffer.append(line, chrIndex1, chrIndex2);
token = buffer.toString();
}
chrIndex1 = chrIndex2+2;
}
resultList.add(token.toString());
if(chrIndex1>=line.length()) return;
}
}
}
Perhaps you could create a Talend component for Splunk and add it to the Talend Community Ecosystem.
This component could utilize our Java SDK for connecting to Splunk and retrieving search results.
You could then use this component in Talend Studio to integrate with your data from Oracle.