Security

Read messages from Active MQ Topic

ManishaAgrawal
Explorer

Hi,

I am trying to configure JMS Modular App to read messages from ActiveMQ Topic. MQ runs locally on the machine and there is a java code that uses jndi and pushes messages into topic called MyTopic. MQ is working fine, each time I run the code, message count in Topic increases (verified on ActiveMQ Web Console).

Problem is that , I am still unable to index the messages in Splunk.
Please let me know where I am wrong.

Configuration on the UI is as below :

Initialisation Mode : jndi
JMS Connection Factory JNDI Name : connectionFactory
JNDI Initial Context Factory Name : org.apache.activemq.jndi.ActiveMQInitialContextFactory
JNDI Provider URL : tcp://localhost:61616
(Also tried with file:/home/user/MQJNDI/) jndi.properties file is placed at /home/user/MQJNDI/

Index message property values , Index message property values and Index message property values are checked.

Sourcetype : mqTopic (manual)

After saving the changes, I push messages again in the Topic , but nothing gets indexed.

jndi.properties is as below:

START SNIPPET: jndi

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

use the following property to configure the default connector

java.naming.provider.url = tcp://localhost:61616

use the following property to specify the JNDI name the connection factory

should appear as.

connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry

register some queues in JNDI using the form

queue.[jndiName] = [physicalName]

queue.MyQueue = MyQueue

register some topics in JNDI using the form

topic.[jndiName] = [physicalName]

topic.MyTopic = MyTopic

END SNIPPET: jndi

Java code that writes messages to Topic
Arguments : MyTopic 100

package com.humana.splunk.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A simple polymorphic JMS producer which can work with Queues or Topics which
* uses JNDI to lookup the JMS connection factory and destination
*
*
*/
public final class SimpleProducer {

private static final Logger LOG = LoggerFactory.getLogger(SimpleProducer.class);

private SimpleProducer() {
}

/**
 * @param args the destination name to send to and optionally, the number of
 *                messages to send
 */
public static void main(String[] args) {
    Context jndiContext = null;
    ConnectionFactory connectionFactory = null;
    Connection connection = null;
    Session session = null;
    Destination destination = null;
    MessageProducer producer = null;
    String destinationName = null;
    final int numMsgs;

    if ((args.length < 1) || (args.length > 2)) {
        LOG.info("Usage: java SimpleProducer <destination-name> [<number-of-messages>]");
        System.out.println("args : " + args[0]);
        System.exit(1);
    }
    destinationName = args[0];
    LOG.info("Destination name is " + destinationName);
    if (args.length == 2) {
        numMsgs = (new Integer(args[1])).intValue();
    } else {
        numMsgs = 1;
    }

    /*
     * Create a JNDI API InitialContext object
     */
    try {
        jndiContext = new InitialContext();
    } catch (NamingException e) {
       LOG.info("Could not create JNDI API context: " + e.toString());
        System.exit(1);
    }

    /*
     * Look up connection factory and destination.
     */
    try {
        connectionFactory = (ConnectionFactory)jndiContext.lookup("topicConnectionFactory");
        destination = (Destination)jndiContext.lookup(destinationName);
    } catch (NamingException e) {
       LOG.info("JNDI API lookup failed: " + e);
        System.exit(1);
    }

    /*
     * Create connection. Create session from connection; false means
     * session is not transacted. Create sender and text message. Send
     * messages, varying text slightly. Send end-of-messages message.
     * Finally, close connection.
     */
    try {
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage();
        for (int i = 0; i < numMsgs; i++) {
            message.setText("This is message " + (i + 1));
           LOG.info("Sending message: " + message.getText());
            producer.send(message);
        }

        /*
         * Send a non-text control message indicating end of messages.
         */
        producer.send(session.createMessage());
    } catch (JMSException e) {
        LOG.info("Exception occurred: " + e);
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException e) {
            }
        }
    }
}

}

Could you please help me identify/resolve the issue.

Thanks in advance.

Tags (3)
0 Karma
1 Solution

Damien_Dallimor
Ultra Champion

Here is an example stanza that works in my ActiveMQ environment :

[jms://topic/dynamicTopics/splunktopic]
durable = 1
host = some-host
index = main
index_message_header = 1
index_message_properties = 1
init_mode = jndi
jms_connection_factory_name = ConnectionFactory
jndi_initialcontext_factory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
jndi_provider_url = tcp://ubuntu-personal:61616
sourcetype = jms
disabled = 1
browse_mode = stats
browse_queue_only = 0
strip_newlines = 0

Furthermore , do you have any error messages ? Search "index=_internal ExecProcessor error jms.py"

View solution in original post

0 Karma

Damien_Dallimor
Ultra Champion

Here is an example stanza that works in my ActiveMQ environment :

[jms://topic/dynamicTopics/splunktopic]
durable = 1
host = some-host
index = main
index_message_header = 1
index_message_properties = 1
init_mode = jndi
jms_connection_factory_name = ConnectionFactory
jndi_initialcontext_factory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
jndi_provider_url = tcp://ubuntu-personal:61616
sourcetype = jms
disabled = 1
browse_mode = stats
browse_queue_only = 0
strip_newlines = 0

Furthermore , do you have any error messages ? Search "index=_internal ExecProcessor error jms.py"

0 Karma
Get Updates on the Splunk Community!

Index This | I’m short for "configuration file.” What am I?

May 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with a Special ...

New Articles from Academic Learning Partners, Help Expand Lantern’s Use Case Library, ...

Splunk Lantern is a Splunk customer success center that provides advice from Splunk experts on valuable data ...

Your Guide to SPL2 at .conf24!

So, you’re headed to .conf24? You’re in for a good time. Las Vegas weather is just *chef’s kiss* beautiful in ...