Getting Data In

Integrating Splunk data in Talend

patrickdelliot
New Member

Greetings. We have data in Splunk, and related data in Oracle. We are looking to integrate the two using Talend. Has anyone had success in using Splunk as an input into Talend? Basically, I want to treat Splunk and Oracle as two inputs and merge the data on a key field. Thanks very much for any guidance.

Tags (1)
0 Karma

ialanis
New Member

I know this is an old-ish thread, but I had the same basic problem, and ended up using straight java (tJavaFlex) to call the splunk rest API to pull out data with a lot of success. I'm posting this in case anybody else (including my future-self) comes across this thread and needs a solution. It would be nice as a stand-alone talend component, but I have other things to move onto now. This is code for a talend tJavaFlex component, but is not really all that talend-specific.

I used org.apache.httpcomponents:httpclient:4.5 and commons-io:commons-io:2.4 to call the service (the built-in talend components to call services weren't flexible enough for me).

If you're trying to use this code in a production environment, you might want to beef it up a little. try/catch/finally to close out http connections, for example. Anyway, works for me.

Imports:

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import javax.net.ssl.SSLContext;
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;

Start code:

// input parameters
boolean selfSignedCert = Boolean.valueOf(context.splunkSelfSignedCert);
String splunkBaseUrl  = context.splunkBaseUrl;  // for example: "https://localhost:8089/services"
String splunkUsername = context.splunkUsername;
String splunkPassword = context.splunkPassword;
String splunkSearchString   = "search index=_internal"; // an example search
String splunkEarliestTime   = "-30seconds";
String splunkLatestTime     = "now";


// Start doing the work...
if(splunkBaseUrl==null) throw new NullPointerException("context.splunkBaseUrl");
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");


// build the uri
URI splunkServiceUri = new URI(splunkBaseUrl);
URIBuilder uriBuilder = new URIBuilder(splunkBaseUrl)
    .setPath(splunkServiceUri.getPath()+"/search/jobs/export")
    .setParameter("search", splunkSearchString)
    .setParameter("output_mode", "csv");
if(splunkEarliestTime!=null) {
    uriBuilder.setParameter("earliest_time", splunkEarliestTime);
}
if(splunkLatestTime!=null) {
    uriBuilder.setParameter("latest_time", splunkLatestTime);
}
splunkServiceUri = uriBuilder.build();

CloseableHttpClient httpClient;
if(!selfSignedCert) {
    httpClient = HttpClients.createDefault();
} else {
    // override ssl trust logic
    SSLContext sslContext = SSLContextBuilder.create()
        .loadTrustMaterial(new TrustStrategy() {
            public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
                return true;
            }})
        .build();
    httpClient = HttpClients.custom()
        .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
        .setSSLContext(sslContext)
        .build();
}

// build basic auth value...
String basicAuthValue = DatatypeConverter.printBase64Binary((splunkUsername+":"+splunkPassword).getBytes("UTF-8"));

HttpGet httpGet = new HttpGet(splunkServiceUri);
httpGet.setHeader("Authorization", "Basic "+basicAuthValue);

//System.out.println("Executing... "+httpGet.getRequestLine());
CloseableHttpResponse httpResponse = httpClient.execute(httpGet);

// wait for response code / check it
int responseCode = httpResponse.getStatusLine().getStatusCode();
if(responseCode == 401) {
    throw new RuntimeException("Unautorized to acess "+splunkBaseUrl+" as "+splunkUsername);
} else if(responseCode != 200) {
    // error condition
    String errorMessage = "Invalid response ("+responseCode+") from "+splunkServiceUri;
    InputStream errorStream = httpResponse.getEntity().getContent();
    System.err.println(errorMessage);
    IOUtils.copy(errorStream, System.err);
    throw new RuntimeException(errorMessage);
}

// read the response stream
InputStream inputStream = httpResponse.getEntity().getContent();
BufferedReader lineReader = new BufferedReader(new InputStreamReader(inputStream));
String line = lineReader.readLine(); // skip over header line
final int expectedFieldCount = 8;
ArrayList<String> results = new ArrayList<String>(expectedFieldCount);
int lineNo = 0;
while((line = lineReader.readLine())!=null) 
{
    lineNo++;

Main code:

results.clear();
CsvLineParser.parse(line, results);
if(results.size() != expectedFieldCount) {
System.err.println("WARNING: unexpected field count on line "+lineNo+". Expected "+expectedFieldCount+" but got "+results.size());
}

row1._serial = Integer.parseInt(results.get(0));
row1._time = dateFormat.parse(results.get(1));
row1.source = results.get(2);
row1.sourcetype = results.get(3);
row1.host = results.get(4);
row1.index = results.get(5);
row1.splunk_server = results.get(6);
row1._raw = results.get(7);

End code:

}

And a routine:

package routines;

import java.util.List;
import java.util.ArrayList;

public class CsvLineParser {
    public static List<String> parse(String line) {
        ArrayList<String> result = new ArrayList<String>();
        parse(line, result);
        return result;
    }
    public static void parse(String line, List<String> resultList) {
        parse(line, resultList, ',', '"');
    }
    public static void parse(String line, List<String> resultList, char fieldDelimiter, char quoteChar) {
        int chrIndex1=0;
        while(true) {
            String token;
            boolean quoted = line.charAt(chrIndex1)==quoteChar;
            int chrIndex2;
            if(!quoted) {
                chrIndex2 = line.indexOf(fieldDelimiter, chrIndex1);
                if(chrIndex2<0) {
                    chrIndex2 = line.length();
                }
                token = line.substring(chrIndex1, chrIndex2);
                chrIndex1 = chrIndex2+1;
            } else {
                StringBuffer buffer = null;
                chrIndex1++; // skip leading quote
                while(true) {
                    chrIndex2 = line.indexOf(quoteChar, chrIndex1);
                    if(chrIndex2<line.length()-1 && line.charAt(chrIndex2+1)==quoteChar) {
                        if(buffer==null) buffer = new StringBuffer();
                        buffer.append(line.substring(chrIndex1,chrIndex2));
                        buffer.append(quoteChar);
                        chrIndex1=chrIndex2+2; // skip over intermediate quotes
                    } else {
                        break;
                    }
                }
                if(chrIndex2<0) {
                    System.err.println("WARNING: unterminated quoted value");
                    chrIndex2 = line.length();
                }
                if(buffer==null) {
                    token = line.substring(chrIndex1, chrIndex2);
                } else {
                    buffer.append(line, chrIndex1, chrIndex2);
                    token = buffer.toString();
                }
                chrIndex1 = chrIndex2+2;
            }
            resultList.add(token.toString());
            if(chrIndex1>=line.length()) return;
        }
    }
}
0 Karma

Damien_Dallimor
Ultra Champion

Perhaps you could create a Talend component for Splunk and add it to the Talend Community Ecosystem.

This component could utilize our Java SDK for connecting to Splunk and retrieving search results.

You could then use this component in Talend Studio to integrate with your data from Oracle.

0 Karma
Get Updates on the Splunk Community!

Enterprise Security Content Update (ESCU) | New Releases

In December, the Splunk Threat Research Team had 1 release of new security content via the Enterprise Security ...

Why am I not seeing the finding in Splunk Enterprise Security Analyst Queue?

(This is the first of a series of 2 blogs). Splunk Enterprise Security is a fantastic tool that offers robust ...

Index This | What are the 12 Days of Splunk-mas?

December 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...